node-resque
Advanced tools
Comparing version 0.11.6 to 0.11.7
@@ -82,3 +82,3 @@ ///////////////////////// | ||
var worker = new NR.worker({connection: connectionDetails, queues: ['default']}, jobs, function(){ | ||
worker.workerCleanup(); // optional: cleanup any previous improperly shutdown workers | ||
worker.workerCleanup(); // optional: cleanup any previous improperly shutdown workers on this host | ||
worker.start(); | ||
@@ -85,0 +85,0 @@ }); |
@@ -47,3 +47,3 @@ ///////////////////////// | ||
var worker = new NR.worker({connection: connectionDetails, queues: ['default']}, jobs, function(){ | ||
worker.workerCleanup(); // optional: cleanup any previous improperly shutdown workers | ||
worker.workerCleanup(); // optional: cleanup any previous improperly shutdown workers on this host | ||
worker.start(); | ||
@@ -50,0 +50,0 @@ }); |
@@ -59,3 +59,3 @@ ///////////////////////// | ||
var worker = new NR.worker({connection: connectionDetails, queues: ['math', 'otherQueue']}, jobs, function(){ | ||
worker.workerCleanup(); // optional: cleanup any previous improperly shutdown workers | ||
worker.workerCleanup(); // optional: cleanup any previous improperly shutdown workers on this host | ||
worker.start(); | ||
@@ -62,0 +62,0 @@ }); |
@@ -38,3 +38,3 @@ ///////////////////////// | ||
var worker = new NR.worker({connection: connectionDetails, queues: ['*']}, jobs, function(){ | ||
worker.workerCleanup(); // optional: cleanup any previous improperly shutdown workers | ||
worker.workerCleanup(); // optional: cleanup any previous improperly shutdown workers on this host | ||
worker.start(); | ||
@@ -41,0 +41,0 @@ }); |
@@ -15,4 +15,4 @@ // If the job fails, sleep, and re-enqueue it. | ||
self.sleep = 1000; | ||
if(self.options.sleep != null){ self.sleep = self.options.sleep; } | ||
} | ||
if(self.options.sleep){ self.sleep = self.options.sleep; } | ||
}; | ||
@@ -26,3 +26,3 @@ //////////////////// | ||
callback(null, true); | ||
} | ||
}; | ||
@@ -32,3 +32,3 @@ simpleRetry.prototype.after_enqueue = function(callback){ | ||
callback(null, true); | ||
} | ||
}; | ||
@@ -38,3 +38,3 @@ simpleRetry.prototype.before_perform = function(callback){ | ||
callback(null, true); | ||
} | ||
}; | ||
@@ -45,15 +45,15 @@ simpleRetry.prototype.after_perform = function(callback){ | ||
if(self.worker.error != null){ | ||
if(self.options.errorCollector != null){ | ||
if(self.worker.error){ | ||
if(self.options.errorCollector){ | ||
self.options.errorCollector.push( self.worker.error ); | ||
} | ||
self.worker.error = null; | ||
setTimeout(function(){ | ||
callback(null, true); | ||
}, self.sleep) | ||
self.worker.queueObject.enqueueIn(self.sleep, self.queue, self.func, self.args, function(err){ | ||
callback(err, true); | ||
}); | ||
}else{ | ||
callback(null, true); | ||
} | ||
} | ||
}; | ||
exports.simpleRetry = simpleRetry; |
163
lib/queue.js
@@ -7,3 +7,3 @@ var util = require('util'); | ||
var self = this; | ||
if(typeof jobs == 'function' && callback == null){ | ||
if(typeof jobs == 'function' && callback === undefined){ | ||
callback = jobs; | ||
@@ -16,4 +16,4 @@ jobs = {}; | ||
self.runPlugin = pluginRunner.runPlugin | ||
self.runPlugins = pluginRunner.runPlugins | ||
self.runPlugin = pluginRunner.runPlugin; | ||
self.runPlugins = pluginRunner.runPlugins; | ||
@@ -24,3 +24,3 @@ self.connection = new connection(options.connection); | ||
}); | ||
} | ||
}; | ||
@@ -33,3 +33,3 @@ queue.prototype.end = function(callback){ | ||
}); | ||
} | ||
}; | ||
@@ -42,10 +42,16 @@ queue.prototype.encode = function(q, func, args){ | ||
}); | ||
} | ||
}; | ||
queue.prototype.enqueue = function(q, func, args, callback){ | ||
var self = this; | ||
var args = arrayify(args); | ||
if(arguments.length === 3 && typeof args === 'function'){ | ||
callback = args; | ||
args = []; | ||
}else if(arguments.length < 3){ | ||
args = []; | ||
} | ||
args = arrayify(args); | ||
var job = self.jobs[func]; | ||
self.runPlugins('before_enqueue', func, q, job, args, function(err, toRun){ | ||
if(toRun == false){ | ||
if(toRun === false){ | ||
if(typeof callback === 'function'){ callback(err, toRun); } | ||
@@ -64,3 +70,3 @@ }else{ | ||
}); | ||
} | ||
}; | ||
@@ -70,3 +76,9 @@ queue.prototype.enqueueAt = function(timestamp, q, func, args, callback){ | ||
var self = this; | ||
var args = arrayify(args); | ||
if(arguments.length === 4 && typeof args === 'function'){ | ||
callback = args; | ||
args = []; | ||
}else if(arguments.length < 4){ | ||
args = []; | ||
} | ||
args = arrayify(args); | ||
self.connection.ensureConnected(callback, function(){ | ||
@@ -90,3 +102,9 @@ var item = self.encode(q, func, args); | ||
var self = this; | ||
var args = arrayify(args); | ||
if(arguments.length === 4 && typeof args === 'function'){ | ||
callback = args; | ||
args = []; | ||
}else if(arguments.length < 4){ | ||
args = []; | ||
} | ||
args = arrayify(args); | ||
var timestamp = (new Date().getTime()) + time; | ||
@@ -96,3 +114,3 @@ self.enqueueAt(timestamp, q, func, args, function(){ | ||
}); | ||
} | ||
}; | ||
@@ -106,3 +124,3 @@ queue.prototype.queues = function(callback){ | ||
}); | ||
} | ||
}; | ||
@@ -114,3 +132,3 @@ queue.prototype.delQueue = function(q, callback){ | ||
}); | ||
} | ||
}; | ||
@@ -124,11 +142,20 @@ queue.prototype.length = function(q, callback){ | ||
}); | ||
} | ||
}; | ||
queue.prototype.del = function(q, func, args, count, callback){ | ||
var self = this; | ||
var args = arrayify(args); | ||
if(typeof count == 'function' && callback == null){ | ||
if(arguments.length === 4 && typeof count == 'function'){ | ||
callback = count; | ||
count = 0; // remove first enqueued items that match | ||
count = 0; | ||
}else if(arguments.length === 3){ | ||
if(typeof args == 'function'){ | ||
callback = args; | ||
args = []; | ||
} | ||
count = 0; | ||
}else if(arguments.length < 3){ | ||
args = []; | ||
count = 0; | ||
} | ||
args = arrayify(args); | ||
self.connection.ensureConnected(callback, function(){ | ||
@@ -139,11 +166,17 @@ self.connection.redis.lrem(self.connection.key('queue', q), count, self.encode(q, func, args), function(err, count){ | ||
}); | ||
} | ||
}; | ||
queue.prototype.delDelayed = function(q, func, args, callback){ | ||
var self = this; | ||
var args = arrayify(args); | ||
if(arguments.length === 3 && typeof args === 'function'){ | ||
callback = args; | ||
args = []; | ||
}else if(arguments.length < 3){ | ||
args = []; | ||
} | ||
args = arrayify(args); | ||
var search = self.encode(q, func, args); | ||
self.connection.ensureConnected(callback, function(){ | ||
var timestamps = self.connection.redis.smembers(self.connection.key("timestamps:" + search), function(err, members){ | ||
if(members.length == 0 ){ if(typeof callback === 'function'){ callback(err, []); } } | ||
if(members.length === 0 ){ if(typeof callback === 'function'){ callback(err, []); } } | ||
else{ | ||
@@ -158,7 +191,7 @@ var started = 0; | ||
started--; | ||
if(started == 0){ | ||
if(started === 0){ | ||
if(typeof callback === 'function'){ callback(err, timestamps); } | ||
} | ||
}) | ||
}) | ||
}); | ||
}); | ||
}); | ||
@@ -168,7 +201,13 @@ } | ||
}); | ||
} | ||
}; | ||
queue.prototype.scheduledAt = function(q, func, args, callback){ | ||
var self = this; | ||
var args = arrayify(args); | ||
if(arguments.length === 3 && typeof args === 'function'){ | ||
callback = args; | ||
args = []; | ||
}else if(arguments.length < 3){ | ||
args = []; | ||
} | ||
args = arrayify(args); | ||
var search = self.encode(q, func, args); | ||
@@ -178,6 +217,6 @@ self.connection.ensureConnected(callback, function(){ | ||
var timestamps = []; | ||
if(members != null){ | ||
if(members !== null){ | ||
members.forEach(function(key){ | ||
timestamps.push(key.split(":")[key.split(":").length - 1]); | ||
}) | ||
}); | ||
} | ||
@@ -187,3 +226,3 @@ if(typeof callback === 'function'){ callback(err, timestamps); } | ||
}); | ||
} | ||
}; | ||
@@ -201,3 +240,3 @@ queue.prototype.timestamps = function(callback){ | ||
}); | ||
} | ||
}; | ||
@@ -214,3 +253,3 @@ queue.prototype.delayedAt = function(timestamp, callback){ | ||
}); | ||
} | ||
}; | ||
@@ -222,16 +261,20 @@ queue.prototype.allDelayed = function(callback){ | ||
self.timestamps(function(err, timestamps){ | ||
timestamps.forEach(function(timestamp){ | ||
started++; | ||
self.delayedAt(timestamp, function(err, tasks, rTimestamp){ | ||
results[(rTimestamp * 1000)] = tasks; | ||
started--; | ||
if(started === 0){ callback(err, results) } | ||
if(timestamps.length === 0){ | ||
callback(err, {}); | ||
}else{ | ||
timestamps.forEach(function(timestamp){ | ||
started++; | ||
self.delayedAt(timestamp, function(err, tasks, rTimestamp){ | ||
results[(rTimestamp * 1000)] = tasks; | ||
started--; | ||
if(started === 0){ callback(err, results); } | ||
}); | ||
}); | ||
}); | ||
} | ||
}); | ||
} | ||
}; | ||
queue.prototype.workers = function(callback){ | ||
var self = this; | ||
var workers = {} | ||
var workers = {}; | ||
self.connection.redis.smembers(self.connection.key('workers'), function(err, results){ | ||
@@ -243,19 +286,19 @@ if(results){ | ||
if(parts.length === 1){ | ||
var name = parts[0]; | ||
name = parts[0]; | ||
workers[name] = null; | ||
} | ||
else if(parts.length === 2){ | ||
var name = parts[0]; | ||
var queues = parts[1]; | ||
name = parts[0]; | ||
queues = parts[1]; | ||
workers[name] = queues; | ||
}else{ | ||
var name = parts.shift() + ":" + parts.shift(); | ||
var queues = parts.join(':'); | ||
name = parts.shift() + ":" + parts.shift(); | ||
queues = parts.join(':'); | ||
workers[name] = queues; | ||
} | ||
}) | ||
}); | ||
} | ||
if(typeof callback === 'function'){ callback(err, workers); } | ||
}); | ||
} | ||
}; | ||
@@ -268,3 +311,3 @@ queue.prototype.workingOn = function(workerName, queues, callback){ | ||
}); | ||
} | ||
}; | ||
@@ -276,4 +319,4 @@ queue.prototype.allWorkingOn = function(callback){ | ||
self.workers(function(err, workers){ | ||
if(err && typeof callback === 'function'){ | ||
callback(err); | ||
if(err && typeof callback === 'function'){ | ||
callback(err, results); | ||
}else if(!workers || hashLength(workers) === 0){ | ||
@@ -287,14 +330,14 @@ callback(null, results); | ||
counter--; | ||
if(data){ | ||
if(data){ | ||
data = JSON.parse(data); | ||
results[data.worker] = data; | ||
results[data.worker] = data; | ||
} | ||
if(counter === 0 && typeof callback === 'function'){ | ||
callback(err, results); | ||
callback(err, results); | ||
} | ||
}); | ||
}; | ||
} | ||
} | ||
}); | ||
} | ||
}; | ||
@@ -306,3 +349,3 @@ ///////////// | ||
var arrayify = function(o){ | ||
if( Array.isArray(o) ) { | ||
if(Array.isArray(o)){ | ||
return o; | ||
@@ -312,3 +355,3 @@ }else{ | ||
} | ||
} | ||
}; | ||
@@ -318,7 +361,7 @@ var hashLength = function(obj) { | ||
for(key in obj){ | ||
if(obj.hasOwnProperty(key)){ size++ } | ||
if(obj.hasOwnProperty(key)){ size++; } | ||
} | ||
return size; | ||
} | ||
}; | ||
exports.queue = queue; | ||
exports.queue = queue; |
@@ -12,7 +12,7 @@ // TODO: Locking like ruby does | ||
for(var i in defaults){ | ||
if(options[i] == null){ | ||
if(options[i] === null || options[i] === undefined){ | ||
options[i] = defaults[i]; | ||
} | ||
} | ||
if(typeof jobs == 'function' && callback == null){ | ||
if(typeof jobs == 'function' && !callback){ | ||
callback = jobs; | ||
@@ -29,3 +29,3 @@ jobs = {}; | ||
}); | ||
} | ||
}; | ||
@@ -38,4 +38,4 @@ util.inherits(scheduler, EventEmitter); | ||
timeout: 5000, | ||
} | ||
} | ||
}; | ||
}; | ||
@@ -45,3 +45,3 @@ scheduler.prototype.start = function() { | ||
if (!self.running) { | ||
self.emit('start') | ||
self.emit('start'); | ||
self.running = true; | ||
@@ -58,3 +58,3 @@ self.processing = false; | ||
self.running = false; | ||
if(self.processing == false){ | ||
if(self.processing === false){ | ||
clearTimeout(self.timer); | ||
@@ -65,3 +65,3 @@ self.emit('end'); | ||
}); | ||
}else if(self.processing == true){ | ||
}else if(self.processing === true){ | ||
setTimeout(function(){ | ||
@@ -145,3 +145,3 @@ self.end(callback); | ||
var self = this; | ||
self.queue.enqueue(job["queue"], job["class"], job.args, function(err){ | ||
self.queue.enqueue(job.queue, job.class, job.args, function(err){ | ||
if(err){ self.emit('error', err); } | ||
@@ -148,0 +148,0 @@ self.emit('transferred_job', timestamp, job); |
@@ -14,3 +14,3 @@ var os = require("os"); | ||
for(var i in defaults){ | ||
if(options[i] == null){ | ||
if(options[i] === undefined || options[i] === null){ | ||
options[i] = defaults[i]; | ||
@@ -29,4 +29,4 @@ } | ||
self.runPlugin = pluginRunner.runPlugin | ||
self.runPlugins = pluginRunner.runPlugins | ||
self.runPlugin = pluginRunner.runPlugin; | ||
self.runPlugins = pluginRunner.runPlugins; | ||
@@ -39,3 +39,3 @@ self.queueObject = new queue({connection: options.connection}, function(err){ | ||
}); | ||
} | ||
}; | ||
@@ -51,4 +51,4 @@ util.inherits(worker, EventEmitter); | ||
looping: true, | ||
} | ||
} | ||
}; | ||
}; | ||
@@ -68,3 +68,3 @@ worker.prototype.start = function() { | ||
self.running = false; | ||
if (self.working == true){ | ||
if (self.working === true){ | ||
setTimeout(function(){ | ||
@@ -83,3 +83,3 @@ self.end(callback); | ||
var self = this; | ||
if (nQueue == null) { | ||
if (nQueue === null || nQueue === undefined) { | ||
nQueue = 0; | ||
@@ -92,3 +92,3 @@ } | ||
self.emit('poll', self.queue); | ||
if(self.queue == null){ | ||
if(self.queue === null || self.queue === undefined){ | ||
self.checkQueues(function(){ | ||
@@ -111,3 +111,3 @@ self.pause(); | ||
}else{ | ||
if(err != null){ | ||
if(err){ | ||
self.emit('error', self.queue, null, err); | ||
@@ -145,9 +145,9 @@ } | ||
self.error = null; | ||
if (self.jobs[job["class"]] == null){ | ||
if (!self.jobs[job["class"]]){ | ||
self.error = new Error("No job defined for class '"+job["class"]+"'"); | ||
self.completeJob(null, true, callback); | ||
}else{ | ||
var cb = self.jobs[job["class"]]["perform"]; | ||
var cb = self.jobs[job["class"]].perform; | ||
self.emit('job', self.queue, job); | ||
if(cb != null) { | ||
if(cb) { | ||
var returnCounter = 0; // a state counter to prevent multiple returns from poor jobs or plugins | ||
@@ -159,3 +159,3 @@ var callbackError = new Error('refusing to continue with job, multiple callbacks detected'); | ||
self.emit('failure', self.queue, job, callbackError); | ||
}else if(toRun == false){ | ||
}else if(toRun === false){ | ||
self.completeJob(null, false, callback); | ||
@@ -165,6 +165,7 @@ }else{ | ||
self.workingOn(job); | ||
if(job.args == null || (job.args instanceof Array) === true){ | ||
var args = job.args; | ||
var args; | ||
if(job.args === undefined || (job.args instanceof Array) === true){ | ||
args = job.args; | ||
}else{ | ||
var args = [job.args]; | ||
args = [job.args]; | ||
} | ||
@@ -178,3 +179,3 @@ cb.apply(self, [].slice.call(args).concat([function(err, result){ | ||
self.runPlugins('after_perform', job["class"], self.queue, self.jobs[job["class"]], job.args, function(e, toRun){ | ||
if(self.error == null && e != null){ self.error = e; } | ||
if(self.error === undefined && e){ self.error = e; } | ||
returnCounter++; | ||
@@ -202,3 +203,3 @@ if(returnCounter !== 3){ | ||
var job = self.job; | ||
if(self.error != null){ | ||
if(self.error){ | ||
self.fail(self.error, job); | ||
@@ -248,3 +249,3 @@ }else if(toRespond){ | ||
self.connection.redis.set(self.connection.key('worker', self.name, self.stringQueues()), JSON.stringify({ | ||
run_at: (new Date).toString(), | ||
run_at: (new Date()).toString(), | ||
queue: self.queue, | ||
@@ -288,3 +289,3 @@ payload: job, | ||
self.track(); | ||
self.connection.redis.set(self.connection.key('worker', self.name, self.stringQueues(), 'started'), (new Date).toString(), function(){ | ||
self.connection.redis.set(self.connection.key('worker', self.name, self.stringQueues(), 'started'), (new Date()).toString(), function(){ | ||
if(typeof callback === 'function'){ callback(); } | ||
@@ -298,2 +299,3 @@ }); | ||
self.connection.redis.smembers(self.connection.key('workers'), function(err, workers){ | ||
if(err){ throw err; } | ||
workers.forEach(function(w){ | ||
@@ -307,5 +309,5 @@ var parts = w.split(":"); | ||
var queues = parts.splice(-1, 1); | ||
var pureName = parts.join(':') | ||
var pureName = parts.join(':'); | ||
self.untrack(pureName, queues); | ||
})(w) | ||
})(w); | ||
} | ||
@@ -316,3 +318,3 @@ }); | ||
}); | ||
} | ||
}; | ||
@@ -338,3 +340,3 @@ worker.prototype.getPids = function(callback){ | ||
}); | ||
} | ||
}; | ||
@@ -346,3 +348,3 @@ worker.prototype.checkQueues = function(callback){ | ||
} | ||
if (self.ready === true && self.queues.length > 0 && self.queues.shift != null) { | ||
if (self.ready === true && self.queues.length > 0 && self.queues.shift) { | ||
return; | ||
@@ -377,3 +379,3 @@ } | ||
backtrace: err.stack ? err.stack.split('\n').slice(1) : null, | ||
failed_at: (new Date).toString() | ||
failed_at: (new Date()).toString() | ||
}; | ||
@@ -384,3 +386,3 @@ }; | ||
var self = this; | ||
if(self.queues.length == 0){ | ||
if(self.queues.length === 0){ | ||
return ["*"].join(","); | ||
@@ -390,3 +392,3 @@ }else{ | ||
} | ||
} | ||
}; | ||
@@ -393,0 +395,0 @@ function prepareJobs(jobs) { |
@@ -5,3 +5,3 @@ { | ||
"description": "an opinionated implementation of resque in node", | ||
"version": "0.11.6", | ||
"version": "0.11.7", | ||
"homepage": "http://github.com/taskrabbit/node-resque", | ||
@@ -8,0 +8,0 @@ "repository": { |
@@ -33,3 +33,3 @@ var specHelper = require(__dirname + "/../_specHelper.js").specHelper; | ||
queue = new specHelper.NR.queue({connection: connectionDetails, queue: specHelper.queue}, function(err){ | ||
if(resolved === false){ // new versions of redis will keep retrying in node v0.11x... | ||
if(resolved === false){ // new versions of redis will keep retrying in node v0.11x... | ||
should.exist(err); | ||
@@ -70,3 +70,3 @@ resolved = true; | ||
obj['class'].should.equal('someJob'); | ||
// obj['args'].should.equal([1,2,3]); | ||
obj['args'].should.eql([1,2,3]); | ||
done(); | ||
@@ -85,3 +85,3 @@ }); | ||
obj['class'].should.equal('someJob'); | ||
// obj['args'].should.equal([1,2,3]); | ||
obj['args'].should.eql([1,2,3]); | ||
done(); | ||
@@ -102,3 +102,3 @@ }); | ||
obj['class'].should.equal('someJob'); | ||
// obj['args'].should.equal([1,2,3]); | ||
obj['args'].should.eql([1,2,3]); | ||
done(); | ||
@@ -164,2 +164,89 @@ }); | ||
it('can handle single arguments without explicit array', function(done){ | ||
queue.enqueue(specHelper.queue, 'someJob', 1, function(){ | ||
specHelper.popFromQueue(function(err, obj){ | ||
JSON.parse(obj)['args'].should.eql([1]); | ||
done(); | ||
}); | ||
}); | ||
}); | ||
it('allows omitting arguments when enqueuing', function(done){ | ||
queue.enqueue(specHelper.queue, 'noParams'); // no callback here, but in practice will finish before next enqueue calls back | ||
queue.enqueue(specHelper.queue, 'noParams', function(){ | ||
queue.length(specHelper.queue, function(err, len){ | ||
len.should.equal(2); | ||
specHelper.popFromQueue(function(err, obj){ | ||
obj = JSON.parse(obj); | ||
obj['class'].should.equal('noParams'); | ||
obj['args'].should.be.empty; | ||
specHelper.popFromQueue(function(err, obj){ | ||
obj = JSON.parse(obj); | ||
obj['class'].should.equal('noParams'); | ||
obj['args'].should.be.empty; | ||
done(); | ||
}); | ||
}); | ||
}); | ||
}); | ||
}); | ||
it('allows omitting arguments when deleting', function(done){ | ||
queue.enqueue(specHelper.queue, 'noParams', [], function(){ | ||
queue.enqueue(specHelper.queue, 'noParams', [], function(){ | ||
queue.length(specHelper.queue, function(err, len){ | ||
len.should.equal(2); | ||
queue.del(specHelper.queue, 'noParams'); | ||
queue.del(specHelper.queue, 'noParams', function(err, len){ | ||
queue.length(specHelper.queue, function(err, len){ | ||
len.should.equal(0); | ||
done(); | ||
}); | ||
}); | ||
}); | ||
}); | ||
}); | ||
}); | ||
it('allows omitting arguments when adding delayed job', function(done){ | ||
queue.allDelayed(function(err, hash){ | ||
hash.should.be.empty; | ||
queue.enqueueAt(10000, specHelper.queue, 'noParams'); | ||
queue.enqueueIn(11000, specHelper.queue, 'noParams'); | ||
queue.enqueueAt(12000, specHelper.queue, 'noParams', function(){ | ||
queue.enqueueIn(13000, specHelper.queue, 'noParams', function(){ | ||
queue.scheduledAt(specHelper.queue, 'noParams', function(err, timestamps){ | ||
timestamps.length.should.equal(4); | ||
queue.allDelayed(function(err, hash){ | ||
Object.keys(hash).length.should.equal(4); | ||
for(var key in hash){ | ||
hash[key][0].args.should.be.empty; | ||
} | ||
done(); | ||
}); | ||
}); | ||
}); | ||
}); | ||
}); | ||
}); | ||
it('allows omitting arguments when deleting a delayed job', function(done){ | ||
queue.allDelayed(function(err, hash){ | ||
hash.should.be.empty; | ||
queue.enqueueAt(10000, specHelper.queue, 'noParams'); | ||
queue.enqueueAt(12000, specHelper.queue, 'noParams', function(){ | ||
queue.allDelayed(function(err, hash){ | ||
Object.keys(hash).length.should.equal(2); | ||
queue.delDelayed(specHelper.queue, 'noParams'); | ||
queue.delDelayed(specHelper.queue, 'noParams', function(){ | ||
queue.allDelayed(function(err, hash){ | ||
hash.should.be.empty; | ||
done(); | ||
}); | ||
}); | ||
}); | ||
}); | ||
}); | ||
}); | ||
describe('delayed status', function(){ | ||
@@ -193,3 +280,3 @@ | ||
tasks_a[1].class.should.equal('job2'); | ||
queue.delayedAt(20000, function(err, tasks_b){ | ||
queue.delayedAt(20000, function(err, tasks_b){ | ||
should.not.exist(err); | ||
@@ -233,11 +320,11 @@ tasks_b.length.should.equal(1); | ||
workerA = new specHelper.NR.worker({ | ||
connection: specHelper.connectionDetails, | ||
timeout: specHelper.timeout, | ||
queues: specHelper.queue, | ||
connection: specHelper.connectionDetails, | ||
timeout: specHelper.timeout, | ||
queues: specHelper.queue, | ||
name: 'workerA' | ||
}, jobs, function(){ | ||
workerB = new specHelper.NR.worker({ | ||
connection: specHelper.connectionDetails, | ||
timeout: specHelper.timeout, | ||
queues: specHelper.queue, | ||
connection: specHelper.connectionDetails, | ||
timeout: specHelper.timeout, | ||
queues: specHelper.queue, | ||
name: 'workerB' | ||
@@ -282,3 +369,3 @@ }, jobs, function(){ | ||
workerA.removeAllListeners('job'); | ||
queue.allWorkingOn(function(err, data){ | ||
@@ -292,3 +379,3 @@ should.not.exist(err); | ||
done(); | ||
}); | ||
}); | ||
}); | ||
@@ -303,2 +390,2 @@ | ||
}); | ||
}); |
@@ -34,3 +34,3 @@ var specHelper = require(__dirname + "/../_specHelper.js").specHelper; | ||
scheduler = new specHelper.NR.scheduler({connection: connectionDetails, timeout: specHelper.timeout}, function(err){ | ||
if(resolved === false){ // new versions of redis will keep retrying in node v0.11x... | ||
if(resolved === false){ // new versions of redis will keep retrying in node v0.11x... | ||
should.exist(err); | ||
@@ -85,3 +85,3 @@ resolved = true; | ||
obj['class'].should.equal('someJob'); | ||
// obj['args'].should.equal([1,2,3]); | ||
obj['args'].should.eql([1,2,3]); | ||
done(); | ||
@@ -105,2 +105,2 @@ }); | ||
}); | ||
}); |
@@ -60,3 +60,3 @@ var specHelper = require(__dirname + "/../_specHelper.js").specHelper; | ||
worker = new specHelper.NR.worker({connection: connectionDetails, timeout: specHelper.timeout}, jobs, function(err){ | ||
if(resolved === false){ // new versions of redis will keep retrying in node v0.11x... | ||
if(resolved === false){ // new versions of redis will keep retrying in node v0.11x... | ||
should.exist(err); | ||
@@ -171,3 +171,2 @@ resolved = true; | ||
it('will not work jobs that are not defined', function(done){ | ||
@@ -174,0 +173,0 @@ var listener = worker.on('failure', function(q, job, failure){ |
Sorry, the diff of this file is not supported yet
136021
3123
327