Comparing version 1.0.0 to 1.1.0
@@ -0,1 +1,12 @@ | ||
v1.1.0 | ||
====== | ||
- Fixed [job corruption issue](https://github.com/OptimalBits/bull/pull/359) | ||
- The job id can be [overridden](https://github.com/OptimalBits/bull/pull/335) to implement job throttling behavior | ||
- Added [`removeOnComplete` job option](https://github.com/OptimalBits/bull/pull/361) | ||
- [More robust job retry](https://github.com/OptimalBits/bull/pull/318) | ||
- Events are [now broadcast to all workers](https://github.com/OptimalBits/bull/commit/d55ad1c8f44f86be9b4e9f4fa9a3fc8a16c6e02d) | ||
[Changes](https://github.com/OptimalBits/bull/compare/v1.0.0...v1.1.0) | ||
v1.0.0 | ||
@@ -2,0 +13,0 @@ ====== |
205
lib/job.js
@@ -18,3 +18,3 @@ /*eslint-env node */ | ||
// queue: Queue, jobId: string, data: {}, opts: JobOptions | ||
// queue: Queue, data: {}, opts: JobOptions | ||
var Job = function(queue, data, opts){ | ||
@@ -44,3 +44,3 @@ opts = opts || {}; | ||
var toKey = _.bind(queue.toKey, queue); | ||
return scripts.addJob(queue.client, toKey, job.opts.lifo, jobData); | ||
return scripts.addJob(queue.client, toKey, jobData, { lifo: job.opts.lifo, customJobId: job.opts.jobId }); | ||
} | ||
@@ -53,8 +53,4 @@ | ||
job.jobId = jobId; | ||
queue.emit('waiting', job); | ||
queue.distEmit('waiting', job.toJSON()); | ||
debuglog('Job added', jobId); | ||
// | ||
// TODO: emit event based on pubsub | ||
//_this.emit('delayed', job); | ||
// | ||
return job; | ||
@@ -71,3 +67,3 @@ }); | ||
if(jobData){ | ||
return Job.fromData(queue, +jobId, jobData); | ||
return Job.fromData(queue, jobId, jobData); | ||
}else{ | ||
@@ -97,3 +93,3 @@ return jobData; | ||
return this.queue.client.hsetAsync(this.queue.toKey(this.jobId), 'progress', progress).then(function(){ | ||
_this.queue.emit('progress', _this, progress); | ||
_this.queue.distEmit('progress', _this.toJSON(), progress); | ||
}); | ||
@@ -105,4 +101,23 @@ }else{ | ||
// | ||
// toData and fromData should be deprecated. | ||
// | ||
Job.prototype.toJSON = function(){ | ||
var opts = this.opts || {}; | ||
opts.jobId = this.jobId; | ||
return { | ||
data: this.data || {}, | ||
opts: opts, | ||
progress: this._progress, | ||
delay: this.delay, | ||
timestamp: this.timestamp, | ||
attempts: this.attempts, | ||
attemptsMade: this.attemptsMade, | ||
stacktrace: this.stacktrace || null, | ||
returnvalue: this.returnvalue || null | ||
}; | ||
}; | ||
/** | ||
Return a unique key representin a lock for this Job | ||
Return a unique key representing a lock for this Job | ||
*/ | ||
@@ -118,9 +133,4 @@ Job.prototype.lockKey = function(){ | ||
Job.prototype.takeLock = function(token, renew){ | ||
var args = [this.lockKey(), token, 'PX', this.queue.LOCK_RENEW_TIME]; | ||
if(!renew){ | ||
args.push('NX'); | ||
} | ||
return this.queue.client.setAsync.apply(this.queue.client, args).then(function(result){ | ||
return result === 'OK'; | ||
return scripts.takeLock(this.queue, this, token, renew).then(function(res){ | ||
return res === 1; // Indicates successful lock. | ||
}); | ||
@@ -133,3 +143,3 @@ }; | ||
Job.prototype.renewLock = function(token){ | ||
return this.takeLock(token, true); | ||
return this.takeLock(token, true /* Renew */); | ||
}; | ||
@@ -158,9 +168,18 @@ | ||
this.returnvalue = returnValue || 0; | ||
return scripts.moveToCompleted(this, token || 0); | ||
return scripts.moveToCompleted(this, token || 0, this.opts.removeOnComplete); | ||
}; | ||
Job.prototype.move = function(src, target, token, returnValue){ | ||
if(target === 'completed'){ | ||
this.returnvalue = returnValue || 0; | ||
if(this.opts.removeOnComplete){ | ||
target = void 0; | ||
} | ||
} | ||
return scripts.move(this, token || 0, src, target); | ||
} | ||
Job.prototype.moveToFailed = function(err){ | ||
var _this = this; | ||
this.stacktrace.push(err.stack); | ||
return this._saveAttempt().then(function() { | ||
return this._saveAttempt(err).then(function() { | ||
// Check if an automatic retry should be performed | ||
@@ -216,17 +235,22 @@ if(_this.attemptsMade < _this.attempts){ | ||
/** | ||
* Attempts to retry the job. Only a job that has failed can be retried. | ||
* | ||
* @return {Promise} If resolved and return code is 1, then the queue emits a waiting event | ||
* otherwise the operation was not a success and throw the corresponding error. If the promise | ||
* rejects, it indicates that the script failed to execute | ||
*/ | ||
Job.prototype.retry = function(){ | ||
var key = this.queue.toKey('wait'); | ||
var failed = this.queue.toKey('failed'); | ||
var channel = this.queue.toKey('jobs'); | ||
var multi = this.queue.multi(); | ||
var queue = this.queue; | ||
var _this = this; | ||
multi.srem(failed, this.jobId); | ||
// if queue is LIFO use rpushAsync | ||
multi[(this.opts.lifo ? 'r' : 'l') + 'push'](key, this.jobId); | ||
multi.publish(channel, this.jobId); | ||
return multi.execAsync().then(function(){ | ||
_this.queue.emit('waiting', _this); | ||
return _this; | ||
return scripts.reprocessJob(this, { state: 'failed' }).then(function(result) { | ||
if (result === 1) { | ||
queue.emit('waiting', _this); | ||
} else if (result === 0) { | ||
throw new Error('Couldn\'t retry job: The job doesn\'t exist'); | ||
} else if (result === -1) { | ||
throw new Error('Couldn\'t retry job: The job is locked'); | ||
} else if (result === -2) { | ||
throw new Error('Couldn\'t retry job: The job has been already retried or has not failed'); | ||
} | ||
}); | ||
@@ -304,6 +328,5 @@ }; | ||
} | ||
return scripts.remove(queue, job.jobId) | ||
.then(function() { | ||
queue.emit('removed', job); | ||
queue.emit('removed', job.toJSON()); | ||
}) | ||
@@ -316,2 +339,68 @@ .finally(function () { | ||
/** | ||
* Returns a promise the resolves when the job has been finished. | ||
* TODO: Add a watchdog to check if the job has finished periodically. | ||
* since pubsub does not give any guarantees. | ||
*/ | ||
Job.prototype.finished = function(){ | ||
var _this = this; | ||
function status(resolve, reject){ | ||
return _this.isCompleted().then(function(completed){ | ||
if(!completed){ | ||
return _this.isFailed().then(function(failed){ | ||
if(failed){ | ||
return Job.fromId(_this.queue, _this.jobId, 'failedReason').then(function(data){ | ||
reject(Error(data.failedReason)); | ||
return true; | ||
}); | ||
} | ||
}); | ||
} | ||
resolve(); | ||
return true; | ||
}); | ||
} | ||
return new Promise(function(resolve, reject){ | ||
status(resolve, reject).then(function(finished){ | ||
if(!finished){ | ||
function onCompleted(job){ | ||
if(job.jobId === _this.jobId){ | ||
resolve(); | ||
} | ||
removeListeners(); | ||
} | ||
function onFailed(job, err){ | ||
if(job.jobId === _this.jobId){ | ||
reject(err); | ||
} | ||
removeListeners(); | ||
} | ||
function removeListeners(){ | ||
_this.queue.removeListener('completed', onCompleted); | ||
_this.queue.removeListener('failed', onFailed); | ||
} | ||
_this.queue.on('completed', onCompleted); | ||
_this.queue.on('failed', onFailed); | ||
// | ||
// Watchdog | ||
// | ||
var interval = setInterval(function(){ | ||
status(resolve, reject).then(function(finished){ | ||
if(finished){ | ||
removeListeners(); | ||
clearInterval(interval ); | ||
} | ||
}) | ||
}, 5000); | ||
}; | ||
}); | ||
}); | ||
} | ||
// ----------------------------------------------------------------------------- | ||
@@ -403,3 +492,3 @@ // Private methods | ||
Job.prototype._saveAttempt = function(){ | ||
Job.prototype._saveAttempt = function(err){ | ||
if(isNaN(this.attemptsMade)){ | ||
@@ -413,5 +502,8 @@ this.attemptsMade = 1; | ||
}; | ||
if(this.stacktrace){ | ||
params.stacktrace = JSON.stringify(this.stacktrace); | ||
} | ||
this.stacktrace.push(err.stack); | ||
params.stacktrace = JSON.stringify(this.stacktrace); | ||
params.failedReason = err.message; | ||
return this.queue.client.hmsetAsync(this.queue.toKey(this.jobId), params); | ||
@@ -428,2 +520,4 @@ }; | ||
job.timestamp = parseInt(data.timestamp); | ||
job.failedReason = data.failedReason; | ||
job.attempts = parseInt(data.attempts); | ||
@@ -454,2 +548,33 @@ if(isNaN(job.attempts)) { | ||
Job.fromJSON = function(queue, json){ | ||
var job = new Job(queue, json.data, json.opts); | ||
job.jobId = json.opts.jobId; | ||
job._progress = parseInt(json.progress); | ||
job.delay = parseInt(json.delay); | ||
job.timestamp = parseInt(json.timestamp); | ||
job.attempts = parseInt(json.attempts); | ||
if(isNaN(job.attempts)) { | ||
job.attempts = 1; // Default to 1 try for legacy jobs | ||
} | ||
job.attemptsMade = parseInt(json.attemptsMade); | ||
var _traces; | ||
try{ | ||
_traces = JSON.parse(json.stacktrace); | ||
if(!(_traces instanceof Array)){ | ||
_traces = []; | ||
} | ||
}catch (err){ | ||
_traces = []; | ||
} | ||
job.stacktrace = _traces; | ||
try{ | ||
job.returnvalue = JSON.parse(json.returnvalue); | ||
}catch (e){ | ||
//swallow exception because the returnvalue got corrupted somehow. | ||
debuglog('corrupted returnvalue: ' + json.returnvalue, e); | ||
} | ||
return job; | ||
} | ||
module.exports = Job; |
@@ -123,3 +123,3 @@ "use strict"; | ||
var fn = function() { | ||
return queue.processStalledJobs().then(queue.getNextJob.bind(queue, { | ||
return queue.moveUnlockedJobsToWait().then(queue.getNextJob.bind(queue, { | ||
block: false | ||
@@ -126,0 +126,0 @@ })) |
148
lib/queue.js
@@ -5,4 +5,6 @@ /*eslint-env node */ | ||
var redis = require('redis'); | ||
var events = require('events'); | ||
var Disturbed = require('disturbed'); | ||
var util = require('util'); | ||
var assert = require('assert'); | ||
var url = require('url'); | ||
var Job = require('./job'); | ||
@@ -16,2 +18,3 @@ var scripts = require('./scripts'); | ||
var debuglog = require('debuglog')('bull'); | ||
Promise.promisifyAll(redis.RedisClient.prototype); | ||
@@ -49,2 +52,10 @@ Promise.promisifyAll(redis.Multi.prototype); | ||
var LOCK_RENEW_TIME = 5000; // 5 seconds is the renew time. | ||
// The interval for which to check for stalled jobs. | ||
var STALLED_JOB_CHECK_INTERVAL = 5000; // 5 seconds is the renew time. | ||
// The maximum number of times a job can be recovered from the 'stalled' state | ||
// (moved back to 'wait'), before it is failed. | ||
var MAX_STALLED_JOB_COUNT = 1; | ||
var CLIENT_CLOSE_TIMEOUT_MS = 5000; | ||
@@ -65,2 +76,16 @@ var POLLING_INTERVAL = 5000; | ||
redisOptions.db = redisOpts.DB; | ||
} else if(_.isString(redisPort)) { | ||
try { | ||
var redisUrl = url.parse(redisPort); | ||
assert(_.isObject(redisHost) || _.isUndefined(redisHost), | ||
'Expected an object as redis option'); | ||
redisOptions = redisHost || {}; | ||
redisPort = redisUrl.port; | ||
redisHost = redisUrl.hostname; | ||
if (redisUrl.auth) { | ||
redisOptions.password = redisUrl.auth.split(':')[1]; | ||
} | ||
} catch (e) { | ||
throw new Error(e.message); | ||
} | ||
} | ||
@@ -117,2 +142,4 @@ | ||
this.LOCK_RENEW_TIME = LOCK_RENEW_TIME; | ||
this.STALLED_JOB_CHECK_INTERVAL = STALLED_JOB_CHECK_INTERVAL; | ||
this.MAX_STALLED_JOB_COUNT = MAX_STALLED_JOB_COUNT; | ||
@@ -146,14 +173,39 @@ // bubble up Redis error events | ||
Disturbed.call(this, _this.client, _this.eclient); | ||
// | ||
// Listen distributed queue events | ||
// | ||
listenDistEvent('stalled'); // | ||
listenDistEvent('completed'); // | ||
listenDistEvent('failed'); // | ||
listenDistEvent('cleaned'); | ||
listenDistEvent('waiting'); // | ||
listenDistEvent('remove'); // | ||
listenDistEvent('progress'); // | ||
function listenDistEvent(eventName){ | ||
var _eventName = eventName + '@' + name; | ||
_this.on(_eventName, function(){ | ||
var args = Array.prototype.slice.call(arguments); | ||
if(eventName !== 'cleaned'){ | ||
args[0] = Job.fromJSON(_this, args[0]); | ||
} | ||
args.unshift(eventName); | ||
_this.emit.apply(_this, args); | ||
}); | ||
} | ||
// | ||
// Handle delay, pause and resume messages | ||
// | ||
var delayedKey = _this.toKey('delayed'); | ||
var pausedKey = _this.toKey('paused'); | ||
this.eclient.on('message', function(channel, message){ | ||
if(channel === _this.toKey('delayed')){ | ||
if(channel === delayedKey){ | ||
_this.updateDelayTimer(message); | ||
}else if(channel === _this.toKey('paused')){ | ||
if(message === 'paused'){ | ||
_this.emit('paused'); | ||
}else if(message === 'resumed'){ | ||
_this.emit('resumed'); | ||
} | ||
}else if(channel === pausedKey){ | ||
_this.emit(message); | ||
} | ||
@@ -190,4 +242,3 @@ }); | ||
// in processJobs etc. | ||
this.processStalledJobs = this.processStalledJobs.bind(this); | ||
this.processStalledJob = this.processStalledJob.bind(this); | ||
this.moveUnlockedJobsToWait = this.moveUnlockedJobsToWait.bind(this); | ||
this.getNextJob = this.getNextJob.bind(this); | ||
@@ -199,4 +250,14 @@ this.processJobs = this.processJobs.bind(this); | ||
util.inherits(Queue, events.EventEmitter); | ||
util.inherits(Queue, Disturbed); | ||
/** | ||
* | ||
* Emits a distributed event. | ||
*/ | ||
Queue.prototype.distEmit = function(){ | ||
var args = Array.prototype.slice.call(arguments); | ||
args[0] = args[0] + '@' + this.name; | ||
return Disturbed.prototype.distEmit.apply(this, args); | ||
} | ||
Queue.prototype.disconnect = function(){ | ||
@@ -235,3 +296,3 @@ var _this = this; | ||
clearInterval(_this.guardianTimer); | ||
clearInterval(_this.stalledJobsInterval); | ||
clearInterval(_this.moveUnlockedJobsToWaitInterval); | ||
_this.timers.clearAll(); | ||
@@ -435,3 +496,3 @@ | ||
return this.processStalledJobs().then(function(){ | ||
return this.moveUnlockedJobsToWait().then(function(){ | ||
@@ -442,8 +503,7 @@ while(concurrency--){ | ||
// | ||
// Set process Stalled jobs intervall | ||
// | ||
clearInterval(_this.stalledJobsInterval); | ||
_this.stalledJobsInterval = | ||
setInterval(_this.processStalledJobs, _this.LOCK_RENEW_TIME); | ||
if (_this.STALLED_JOB_CHECK_INTERVAL > 0){ | ||
clearInterval(_this.moveUnlockedJobsToWaitInterval); | ||
_this.moveUnlockedJobsToWaitInterval = | ||
setInterval(_this.moveUnlockedJobsToWait, _this.STALLED_JOB_CHECK_INTERVAL); | ||
} | ||
@@ -489,16 +549,28 @@ return Promise.all(promises); | ||
/** | ||
Process jobs that have been added to the active list but are not being | ||
processed properly. | ||
* Process jobs that have been added to the active list but are not being | ||
* processed properly. This can happen due to a process crash in the middle | ||
* of processing a job, leaving it in 'active' but without a job lock. | ||
*/ | ||
Queue.prototype.processStalledJobs = function(){ | ||
Queue.prototype.moveUnlockedJobsToWait = function(){ | ||
var _this = this; | ||
if(this.closing){ | ||
return this.closing; | ||
} else{ | ||
return this.client.lrangeAsync(this.toKey('active'), 0, -1).then(function(jobs){ | ||
return Promise.each(jobs, function(jobId) { | ||
return Job.fromId(_this, jobId).then(_this.processStalledJob); | ||
return scripts.moveUnlockedJobsToWait(this).then(function(responses){ | ||
var handleFailedJobs = responses[0].map(function(jobId){ | ||
return _this.getJobFromId(jobId).then(function(job){ | ||
_this.distEmit('failed', job.toJSON(), new Error('job stalled more than allowable limit')); | ||
return null; | ||
}); | ||
}); | ||
var handleStalledJobs = responses[1].map(function(jobId){ | ||
return _this.getJobFromId(jobId).then(function(job){ | ||
_this.distEmit('stalled', job.toJSON()); | ||
return null; | ||
}); | ||
}); | ||
return Promise.all(handleFailedJobs.concat(handleStalledJobs)); | ||
}).catch(function(err){ | ||
console.error(err); | ||
console.error('Failed to handle unlocked job in active:', err); | ||
}); | ||
@@ -508,20 +580,2 @@ } | ||
Queue.prototype.processStalledJob = function(job){ | ||
var _this = this; | ||
if(this.closing){ | ||
return this.closing; | ||
} | ||
if(!job){ | ||
return Promise.resolve(); | ||
}else{ | ||
return scripts.getStalledJob(this, job, _this.token).then(function(isStalled){ | ||
if(isStalled){ | ||
_this.emit('stalled', job); | ||
return _this.processJob(job, true); | ||
} | ||
}); | ||
} | ||
}; | ||
Queue.prototype.processJobs = function(resolve, reject){ | ||
@@ -593,3 +647,3 @@ var _this = this; | ||
.then(function(){ | ||
_this.emit('completed', job, data); | ||
_this.distEmit('completed', job.toJSON(), data); | ||
return null; // Fixes #253 | ||
@@ -605,3 +659,3 @@ }); | ||
.then(function(){ | ||
_this.emit('failed', job, error); | ||
_this.distEmit('failed', job.toJSON(), error); | ||
return null; // Fixes #253 | ||
@@ -834,3 +888,3 @@ }); | ||
return scripts.cleanJobsInSet(_this, type, Date.now() - grace, limit).then(function (jobs) { | ||
_this.emit('cleaned', jobs, type); | ||
_this.distEmit('cleaned', jobs, type); | ||
resolve(jobs); | ||
@@ -837,0 +891,0 @@ return null; |
@@ -68,3 +68,6 @@ /** | ||
}, | ||
addJob: function(client, toKey, lifo, job){ | ||
addJob: function(client, toKey, job, opts){ | ||
opts = opts || {}; | ||
opts.lifo = !!(opts.lifo); | ||
var jobArgs = _.flatten(_.toPairs(job)); | ||
@@ -78,8 +81,12 @@ | ||
var argvs = _.map(jobArgs, function(arg, index){ | ||
return ', ARGV['+(index+2)+']'; | ||
return ', ARGV['+(index+3)+']'; | ||
}) | ||
var script = [ | ||
'local jobId = redis.call("INCR", KEYS[5])', | ||
'redis.call("HMSET", ARGV[1] .. jobId' + argvs.join('') + ')', | ||
'local jobCounter = redis.call("INCR", KEYS[5])', | ||
'local jobId', | ||
'if ARGV[2] == "" then jobId = jobCounter else jobId = ARGV[2] end', | ||
'local jobIdKey = ARGV[1] .. jobId', | ||
'if redis.call("EXISTS", jobIdKey) == 1 then return jobId end', | ||
'redis.call("HMSET", jobIdKey' + argvs.join('') + ')', | ||
]; | ||
@@ -92,3 +99,3 @@ | ||
script.push.apply(script, [ | ||
' local timestamp = tonumber(ARGV[' + (argvs.length + 2) + ']) * 0x1000 + bit.band(jobId, 0xfff)', | ||
' local timestamp = tonumber(ARGV[' + (argvs.length + 3) + ']) * 0x1000 + bit.band(jobCounter, 0xfff)', | ||
' redis.call("ZADD", KEYS[6], timestamp, jobId)', | ||
@@ -101,3 +108,3 @@ ' redis.call("PUBLISH", KEYS[6], (timestamp / 0x1000))', | ||
}else{ | ||
var push = (lifo ? 'R' : 'L') + 'PUSH'; | ||
var push = (opts.lifo ? 'R' : 'L') + 'PUSH'; | ||
// | ||
@@ -113,3 +120,3 @@ // Whe check for the meta-paused key to decide if we are paused or not | ||
'redis.call("PUBLISH", KEYS[4], jobId)', | ||
'return jobId' | ||
'return jobId .. ""' | ||
]); | ||
@@ -129,2 +136,3 @@ | ||
args.push(baseKey); | ||
args.push(opts.customJobId || ''); | ||
args.push.apply(args, jobArgs); | ||
@@ -135,6 +143,10 @@ args.push(delayTimestamp); | ||
}, | ||
moveToCompleted: function(job, token){ | ||
// TODO: perfect this function so that it can be used instead | ||
// of all the specialized functions moveToComplete, etc. | ||
move: function(job, token, src, target){ | ||
// TODO: Depending on the source we should use LREM, SREM or ZREM. | ||
// TODO: Depending on the target we should use LPUSH, SADD, etc. | ||
var keys = _.map([ | ||
'active', | ||
'completed', | ||
src, | ||
target, | ||
job.jobId | ||
@@ -145,16 +157,21 @@ ], function(name){ | ||
); | ||
keys.push(job.lockKey()); | ||
// | ||
// INVESTIGATE: Should'nt we check if we have the lock before trying to move the | ||
// job? | ||
// | ||
var deleteJob = 'redis.call("DEL", KEYS[3])'; | ||
var moveJob = [ | ||
'redis.call("SADD", KEYS[2], ARGV[1])', | ||
'redis.call("HSET", KEYS[3], "returnvalue", ARGV[2])', | ||
].join('\n'); | ||
var script = [ | ||
'if redis.call("EXISTS", KEYS[3]) == 1 then', | ||
' redis.call("SADD", KEYS[2], ARGV[1])', | ||
' redis.call("HSET", KEYS[3], "returnvalue", ARGV[2])', | ||
' redis.call("LREM", KEYS[1], 0, ARGV[1])', | ||
' if redis.call("get", KEYS[4]) == ARGV[3] then', // Remove the lock | ||
' return redis.call("del", KEYS[4])', | ||
'if redis.call("EXISTS", KEYS[3]) == 1 then', // Make sure job exists | ||
' local lock = redis.call("GET", KEYS[4])', | ||
' if (not lock) or (lock == ARGV[3]) then', // Makes sure we own the lock | ||
' redis.call("LREM", KEYS[1], -1, ARGV[1])', | ||
target ? moveJob : deleteJob, | ||
' redis.call("DEL", KEYS[4])', | ||
' return 0', | ||
' else', | ||
' return -2', | ||
' end', | ||
@@ -168,3 +185,3 @@ 'else', | ||
job.queue.client, | ||
'moveToCompletedSet', | ||
'move' + src + (target ? target : ''), | ||
script, | ||
@@ -182,7 +199,16 @@ keys.length, | ||
return execScript.apply(scripts, args).then(function(result){ | ||
if(result === -1){ | ||
throw new Error('Missing Job ' + job.jobId + ' when trying to move from active to completed'); | ||
switch (result){ | ||
case -1: | ||
if(src){ | ||
throw new Error('Missing Job ' + job.jobId + ' when trying to move from ' + src + ' to ' + target); | ||
} else { | ||
throw new Error('Missing Job ' + job.jobId + ' when trying to remove it from ' + src); | ||
} | ||
case -2: | ||
throw new Error('Cannot get lock for job ' + job.jobId + ' when trying to move from ' + src); | ||
} | ||
}); | ||
}, | ||
moveToCompleted: function(job, token, removeOnComplete){ | ||
return scripts.move(job, token, 'active', removeOnComplete ? void 0 : 'completed'); | ||
/* | ||
@@ -286,2 +312,38 @@ var params = {}; | ||
}, | ||
/** | ||
* Takes a lock | ||
*/ | ||
takeLock: function(queue, job, token, renew){ | ||
var lockCall; | ||
if (renew){ | ||
lockCall = 'redis.call("SET", KEYS[1], ARGV[1], "PX", ARGV[2])'; | ||
} else { | ||
lockCall = 'redis.call("SET", KEYS[1], ARGV[1], "PX", ARGV[2], "NX")'; | ||
} | ||
var script = [ | ||
'if(' + lockCall + ') then', | ||
// Mark the job as having been locked at least once. Used to determine if the job was stalled. | ||
' redis.call("HSET", KEYS[2], "lockAcquired", "1")', | ||
' return 1', | ||
'else', | ||
' return 0', | ||
'end' | ||
].join('\n'); | ||
var args = [ | ||
queue.client, | ||
'takeLock' + (renew ? 'Renew' : ''), | ||
script, | ||
2, | ||
job.lockKey(), | ||
queue.toKey(job.jobId), | ||
token, | ||
queue.LOCK_RENEW_TIME | ||
]; | ||
return execScript.apply(scripts, args); | ||
}, | ||
releaseLock: function(job, token){ | ||
@@ -291,5 +353,5 @@ var script = [ | ||
'then', | ||
'return redis.call("del", KEYS[1])', | ||
' return redis.call("del", KEYS[1])', | ||
'else', | ||
'return 0', | ||
' return 0', | ||
'end'].join('\n'); | ||
@@ -362,22 +424,61 @@ | ||
/** | ||
* Gets a stalled job by locking it and checking it is not already completed. | ||
* Returns a "OK" if the job was locked and not in completed set. | ||
* Looks for unlocked jobs in the active queue. There are two circumstances in which a job | ||
* would be in 'active' but NOT have a job lock: | ||
* | ||
* Case A) The job was being worked on, but the worker process died and it failed to renew the lock. | ||
* We call these jobs 'stalled'. This is the most common case. We resolve these by moving them | ||
* back to wait to be re-processed. To prevent jobs from cycling endlessly between active and wait, | ||
* (e.g. if the job handler keeps crashing), we limit the number stalled job recoveries to MAX_STALLED_JOB_COUNT. | ||
* Case B) The job was just moved to 'active' from 'wait' and the worker that moved it hasn't gotten | ||
* a lock yet, or died immediately before getting the lock (note that due to Redis limitations, the | ||
* worker can't move the job and get the lock atomically - https://github.com/OptimalBits/bull/issues/258). | ||
* For this case we also move the job back to 'wait' for reprocessing, but don't consider it 'stalled' | ||
* since the job had never been started. This case is much rarer than Case A due to the very small | ||
* timing window in which it must occur. | ||
*/ | ||
getStalledJob: function(queue, job, token){ | ||
moveUnlockedJobsToWait: function(queue){ | ||
var script = [ | ||
'if redis.call("sismember", KEYS[1], ARGV[1]) == 0 then', | ||
' return redis.call("set", KEYS[2], ARGV[2], "PX", ARGV[3], "NX")', | ||
'local MAX_STALLED_JOB_COUNT = tonumber(ARGV[1])', | ||
'local activeJobs = redis.call("LRANGE", KEYS[1], 0, -1)', | ||
'local stalled = {}', | ||
'local failed = {}', | ||
'for _, job in ipairs(activeJobs) do', | ||
' local jobKey = ARGV[2] .. job', | ||
' if(redis.call("EXISTS", jobKey .. ":lock") == 0) then', | ||
// Remove from the active queue. | ||
' redis.call("LREM", KEYS[1], 1, job)', | ||
' local lockAcquired = redis.call("HGET", jobKey, "lockAcquired")', | ||
' if(lockAcquired) then', | ||
// If it was previously locked then we consider it 'stalled' (Case A above). If this job | ||
// has been stalled too many times, such as if it crashes the worker, then fail it. | ||
' local stalledCount = redis.call("HINCRBY", jobKey, "stalledCounter", 1)', | ||
' if(stalledCount > MAX_STALLED_JOB_COUNT) then', | ||
' redis.call("SADD", KEYS[3], job)', | ||
' redis.call("HSET", jobKey, "failedReason", "job stalled more than allowable limit")', | ||
' table.insert(failed, job)', | ||
' else', | ||
// Move the job back to the wait queue, to immediately be picked up by a waiting worker. | ||
' redis.call("RPUSH", KEYS[2], job)', | ||
' table.insert(stalled, job)', | ||
' end', | ||
' else', | ||
// Move the job back to the wait queue, to immediately be picked up by a waiting worker. | ||
' redis.call("RPUSH", KEYS[2], job)', | ||
' end', | ||
' end', | ||
'end', | ||
'return 0'].join('\n'); | ||
'return {failed, stalled}' | ||
].join('\n'); | ||
var args = [ | ||
queue.client, | ||
'getStalledJob', | ||
'moveUnlockedJobsToWait', | ||
script, | ||
2, | ||
queue.toKey('completed'), | ||
job.lockKey(), | ||
job.jobId, | ||
token, | ||
queue.LOCK_RENEW_TIME | ||
3, | ||
queue.toKey('active'), | ||
queue.toKey('wait'), | ||
queue.toKey('failed'), | ||
queue.MAX_STALLED_JOB_COUNT, | ||
queue.toKey('') | ||
]; | ||
@@ -460,2 +561,63 @@ | ||
return execScript.apply(scripts, args); | ||
}, | ||
/** | ||
* Attempts to reprocess a job | ||
* | ||
* @param {Job} job | ||
* @param {Object} options | ||
* @param {String} options.state The expected job state. If the job is not found | ||
* on the provided state, then it's not reprocessed. Supported states: 'failed', 'completed' | ||
* | ||
* @return {Promise<Number>} Returns a promise that evaluates to a return code: | ||
* 1 means the operation was a success | ||
* 0 means the job does not exist | ||
* -1 means the job is currently locked and can't be retried. | ||
* -2 means the job was not found in the expected set | ||
*/ | ||
reprocessJob: function(job, options) { | ||
var push = (job.opts.lifo ? 'R' : 'L') + 'PUSH'; | ||
var script = [ | ||
'if (redis.call("EXISTS", KEYS[1]) == 1) then', | ||
' if (redis.call("EXISTS", KEYS[2]) == 0) then', | ||
' if (redis.call("SREM", KEYS[3], ARGV[1]) == 1) then', | ||
' redis.call("' + push + '", KEYS[4], ARGV[1])', | ||
' redis.call("PUBLISH", KEYS[5], ARGV[1])', | ||
' return 1', | ||
' else', | ||
' return -2', | ||
' end', | ||
' else', | ||
' return -1', | ||
' end', | ||
'else', | ||
' return 0', | ||
'end' | ||
].join('\n'); | ||
var queue = job.queue; | ||
var keys = [ | ||
queue.toKey(job.jobId), | ||
queue.toKey(job.jobId) + ':lock', | ||
queue.toKey(options.state), | ||
queue.toKey('wait'), | ||
queue.toKey('jobs') | ||
]; | ||
var args = [ | ||
queue.client, | ||
'retryJob', | ||
script, | ||
5, | ||
keys[0], | ||
keys[1], | ||
keys[2], | ||
keys[3], | ||
keys[4], | ||
job.jobId | ||
]; | ||
return execScript.apply(scripts, args); | ||
} | ||
@@ -462,0 +624,0 @@ }; |
{ | ||
"name": "bull", | ||
"version": "1.0.0", | ||
"version": "1.1.0", | ||
"description": "Job manager", | ||
@@ -20,8 +20,9 @@ "main": "index.js", | ||
"dependencies": { | ||
"bluebird": "^3.4.1", | ||
"bluebird": "^3.4.6", | ||
"debuglog": "^1.0.0", | ||
"lodash": "^4.13.1", | ||
"disturbed": "^1.0.3", | ||
"lodash": "^4.16.6", | ||
"node-uuid": "^1.4.7", | ||
"redis": "^2.6.2", | ||
"semver": "^5.1.0" | ||
"redis": "^2.6.3", | ||
"semver": "^5.3.0" | ||
}, | ||
@@ -31,5 +32,5 @@ "devDependencies": { | ||
"gulp": "^3.9.1", | ||
"gulp-eslint": "^2.0.0", | ||
"gulp-eslint": "^2.1.0", | ||
"mocha": "^2.5.3", | ||
"sinon": "^1.17.4" | ||
"sinon": "^1.17.6" | ||
}, | ||
@@ -36,0 +37,0 @@ "scripts": { |
@@ -5,9 +5,9 @@ Bull Job Manager | ||
[![Join the chat at https://gitter.im/OptimalBits/bull](https://badges.gitter.im/Join%20Chat.svg)](https://gitter.im/OptimalBits/bull?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge) | ||
[![npm](https://img.shields.io/npm/dm/bull.svg?maxAge=2592000)]() | ||
[![BuildStatus](https://secure.travis-ci.org/OptimalBits/bull.png?branch=master)](http://travis-ci.org/OptimalBits/bull) | ||
[![NPM version](https://badge.fury.io/js/bull.svg)](http://badge.fury.io/js/bull) | ||
![bull](http://files.softicons.com/download/animal-icons/animal-icons-by-martin-berube/png/128/bull.png) | ||
<img src="https://image.freepik.com/free-icon/strong-bull-side-view_318-52710.jpg", width="200" /> | ||
A lightweight, robust and fast job processing queue. | ||
The fastest, more reliable redis based queue for nodejs. | ||
Carefully written for rock solid stability and atomicity. | ||
@@ -22,2 +22,14 @@ | ||
Sponsors: | ||
--------- | ||
<a href="http://mixmax.com"> | ||
<img src="https://mixmax.com/images/logo_confirmation.png" alt="Mixmax, Inc" width="100" /> | ||
</a> | ||
<a href="http://optimalbits.com"> | ||
<img src="http://optimalbits.com/images/logo.png" /> | ||
</a> | ||
Are you developing bull sponsored by a company? Please, let us now! | ||
Features: | ||
@@ -33,2 +45,3 @@ --------- | ||
- Pause/resume (globally or locally). | ||
- Automatic recovery from process crashes. | ||
@@ -40,5 +53,5 @@ UIs: | ||
[matador](https://github.com/ShaneK/Matador) | ||
[react-bull](https://github.com/kfatehi/react-bull) | ||
[toureiro](https://github.com/Epharmix/Toureiro) | ||
* [matador](https://github.com/ShaneK/Matador) | ||
* [react-bull](https://github.com/kfatehi/react-bull) | ||
* [toureiro](https://github.com/Epharmix/Toureiro) | ||
@@ -62,2 +75,3 @@ We also have an official UI which is at the moment bare bones project: [bull-ui](https://github.com/OptimalBits/bull-ui) | ||
var imageQueue = Queue('image transcoding', 6379, '127.0.0.1'); | ||
var pdfQueue = Queue('pdf transcoding', 6379, '127.0.0.1'); | ||
@@ -122,3 +136,3 @@ videoQueue.process(function(job, done){ | ||
return pdfAsyncProcessor(); | ||
} | ||
}); | ||
@@ -176,4 +190,3 @@ videoQueue.add({video: 'http://example.com/video1.mov'}); | ||
.on('stalled', function(job){ | ||
// The job was considered stalled (i.e. its lock was not renewed in LOCK_RENEW_TIME). | ||
// Useful for debugging job workers that crash or pause the event loop. | ||
// Job that was considered stalled. Useful for debugging job workers that crash or pause the event loop. | ||
}) | ||
@@ -251,2 +264,3 @@ .on('progress', function(job, progress){ | ||
If the process that is handling the job fails the reacquire the lock (because it hung or crashed), the job will be automatically restarted by any worker. | ||
@@ -321,2 +335,3 @@ Useful patterns | ||
* [Queue##close](#close) | ||
* [Queue##getJob](#getJob) | ||
* [Job](#job) | ||
@@ -330,2 +345,3 @@ * [Job##remove](#remove) | ||
###Queue(queueName, redisPort, redisHost, [redisOpts]) | ||
###Queue(queueName, redisConnectionString, [redisOpts]) | ||
@@ -345,2 +361,12 @@ This is the Queue constructor. It creates a new Queue that is persisted in | ||
Alternatively, it's possible to pass a connection string to create a new queue. | ||
__Arguments__ | ||
```javascript | ||
queueName {String} A unique name for this Queue. | ||
redisConnectionString {String} A connection string containing the redis server host, port and (optional) authentication. | ||
redisOptions {Object} Options to pass to the redis client. https://github.com/mranney/node_redis | ||
``` | ||
--------------------------------------- | ||
@@ -413,17 +439,26 @@ | ||
```javascript | ||
data {PlainObject} A plain object with arguments that will be passed | ||
to the job processing function in job.data. | ||
opts {PlainObject} A plain object with arguments that will be passed | ||
to the job processing function in job.opts | ||
opts.delay {Number} An amount of miliseconds to wait until this job | ||
can be processed. Note that for accurate delays, both server and clients | ||
should have their clocks synchronized. [optional] | ||
opts.attempts {Number} The total number of attempts to try the job until it completes. | ||
opts.backoff {Number|Object} Backoff setting for automatic retries if the job fails | ||
opts.backoff.type {String} Backoff type, which can be either `fixed` or `exponential` | ||
opts.backoff.delay {String} Backoff delay, in milliseconds | ||
opts.lifo {Boolean} A boolean which, if true, adds the job to the right | ||
of the queue instead of the left (default false) | ||
opts.timeout {Number} The number of milliseconds after which the job | ||
should be fail with a timeout error [optional] | ||
data {PlainObject} A plain object with arguments that will be passed to | ||
the job processing function in job.data. | ||
opts A plain object with arguments that will be passed to the job | ||
processing function in job.opts. | ||
{ | ||
delay {Number} An amount of miliseconds to wait until this job can be processed. Note that for accurate delays, both server and clients should have their clocks synchronized. [optional] | ||
attempts {Number} The total number of attempts to try the job until it completes. | ||
backoff {Number|Object} Backoff setting for automatic retries if the job fails | ||
backoff.type {String} Backoff type, which can be either `fixed` or `exponential` | ||
backoff.delay {String} Backoff delay, in milliseconds | ||
lifo {Boolean} A boolean which, if true, adds the job to the right of the queue | ||
instead of the left (default false) | ||
timeout {Number} The number of milliseconds after which the job should be fail | ||
with a timeout error [optional] | ||
jobId {Number|String} Override the job ID - by default, the job ID is a unique | ||
integer, but you can use this setting to override it. | ||
If you use this option, it is up to you to ensure the | ||
jobId is unique. If you attempt to add a job with an id that | ||
already exists, it will not be added. | ||
removeOnComplete {Boolean} A boolean which, if true, removes the job when it successfully | ||
completes. Default behavior is to keep the job in the completed queue. | ||
} | ||
returns {Promise} A promise that resolves when the job has been succesfully | ||
@@ -577,7 +612,5 @@ added to the queue (or rejects if some error occured). On success, the promise | ||
<a name="clean"/> | ||
#### Queue##clean(options) | ||
#### Queue##clean(grace, [type], [limit]) | ||
Tells the queue remove all jobs created outside of a grace period. | ||
You can clean the jobs with the following states: completed, waiting, active, | ||
delayed, and failed. | ||
Tells the queue remove jobs of a specific type created outside of a grace period. | ||
@@ -602,2 +635,3 @@ __Example__ | ||
delayed, and failed. Defaults to completed. | ||
limit {int} maximum amount of jobs to clean per call. If not provided will clean all matching jobs. | ||
returns {Promise} A promise that resolves with an array of removed jobs. | ||
@@ -604,0 +638,0 @@ ``` |
@@ -59,2 +59,24 @@ /*eslint-env node */ | ||
}); | ||
it('should use the custom jobId if one is provided', function() { | ||
var customJobId = 'customjob'; | ||
return Job.create(queue, data, { jobId: customJobId }).then(function(createdJob){ | ||
expect(createdJob.jobId).to.be.equal(customJobId); | ||
}); | ||
}); | ||
it('should process jobs with custom jobIds', function(done) { | ||
var customJobId = 'customjob'; | ||
queue.process(function () { | ||
return Promise.resolve(); | ||
}); | ||
queue.add({ foo: 'bar' }, { jobId: customJobId }); | ||
queue.on('completed', function(job) { | ||
if (job.opts.jobId == customJobId) { | ||
done(); | ||
} | ||
}); | ||
}); | ||
}); | ||
@@ -365,12 +387,12 @@ | ||
return Job.create(queue, {foo: 'baz'}).then(function(job) { | ||
return job.isStuck().then(function(yes) { | ||
expect(yes).to.be(false); | ||
return job.isStuck().then(function(isStuck) { | ||
expect(isStuck).to.be(false); | ||
return job.getState(); | ||
}).then(function(state) { | ||
expect(state).to.be('waiting'); | ||
return job.moveToCompleted(); | ||
return job.move('wait', 'completed'); | ||
}).then(function (){ | ||
return job.isCompleted(); | ||
}).then(function (yes) { | ||
expect(yes).to.be(true); | ||
}).then(function (isCompleted) { | ||
expect(isCompleted).to.be(true); | ||
return job.getState(); | ||
@@ -394,4 +416,4 @@ }).then(function(state) { | ||
return job.isFailed(); | ||
}).then(function (yes) { | ||
expect(yes).to.be(true); | ||
}).then(function (isFailed) { | ||
expect(isFailed).to.be(true); | ||
return job.getState(); | ||
@@ -405,3 +427,3 @@ }).then(function(state) { | ||
}).then(function(state) { | ||
expect(state).to.be('waiting'); | ||
expect(state).to.be('stuck'); | ||
return client.rpopAsync(queue.toKey('wait')); | ||
@@ -412,4 +434,4 @@ }).then(function(){ | ||
return job.isPaused(); | ||
}).then(function (yes) { | ||
expect(yes).to.be(true); | ||
}).then(function (isPaused) { | ||
expect(isPaused).to.be(true); | ||
return job.getState(); | ||
@@ -423,4 +445,4 @@ }).then(function(state) { | ||
return job.isWaiting(); | ||
}).then(function (yes) { | ||
expect(yes).to.be(true); | ||
}).then(function (isWaiting) { | ||
expect(isWaiting).to.be(true); | ||
return job.getState(); | ||
@@ -433,2 +455,61 @@ }).then(function(state) { | ||
describe('.finished', function() { | ||
it('should resolve when the job has been completed', function(done){ | ||
queue.process(function () { | ||
return Promise.resolve(); | ||
}); | ||
queue.add({ foo: 'bar' }).then(function(job){ | ||
return job.finished(); | ||
}).then(function(){ | ||
done(); | ||
}, done); | ||
}); | ||
it('should reject when the job has been completed', function(done){ | ||
queue.process(function () { | ||
return Promise.reject(Error('test error')); | ||
}); | ||
queue.add({ foo: 'bar' }).then(function(job){ | ||
return job.finished(); | ||
}).then(function(){ | ||
done(Error('should have been rejected')); | ||
}, function(err){ | ||
expect(err.message).equal('test error'); | ||
done(); | ||
}); | ||
}); | ||
it('should resolve directly if already processed', function(done){ | ||
queue.process(function () { | ||
return Promise.resolve(); | ||
}); | ||
queue.add({ foo: 'bar' }).then(function(job){ | ||
return Promise.delay(1500).then(function(){ | ||
return job.finished(); | ||
}) | ||
}).then(function(){ | ||
done(); | ||
}, done); | ||
}); | ||
it('should reject directly if already processed', function(done){ | ||
queue.process(function () { | ||
return Promise.reject(Error('test error')); | ||
}); | ||
queue.add({ foo: 'bar' }).then(function(job){ | ||
return Promise.delay(1500).then(function(){ | ||
return job.finished(); | ||
}); | ||
}).then(function(){ | ||
done(Error('should have been rejected')); | ||
}, function(err){ | ||
expect(err.message).equal('test error'); | ||
done(); | ||
}); | ||
}); | ||
it.skip('should resolve using the watchdog if pubsub was lost'); | ||
it.skip('should reject using the watchdog if pubsub was lost'); | ||
}); | ||
}); |
@@ -24,3 +24,3 @@ /// <reference path='../typings/mocha/mocha.d.ts'/> | ||
describe('Priority queue', function(){ | ||
describe.skip('Priority queue', function(){ | ||
var queue; | ||
@@ -27,0 +27,0 @@ var sandbox = sinon.sandbox.create(); |
@@ -33,3 +33,3 @@ /*eslint-env node */ | ||
beforeEach(function(){ | ||
beforeEach(function () { | ||
var client = redis.createClient(); | ||
@@ -39,3 +39,3 @@ return client.flushdbAsync(); | ||
afterEach(function(){ | ||
afterEach(function () { | ||
sandbox.restore(); | ||
@@ -47,3 +47,3 @@ }); | ||
beforeEach(function () { | ||
return utils.newQueue('test').then(function(queue){ | ||
return utils.newQueue('test').then(function (queue) { | ||
testQueue = queue; | ||
@@ -54,3 +54,3 @@ }); | ||
it('should call end on the client', function (done) { | ||
testQueue.client.once('end', function(){ | ||
testQueue.client.once('end', function () { | ||
done(); | ||
@@ -69,3 +69,3 @@ }); | ||
it('should call end on the event subscriber client', function (done) { | ||
testQueue.eclient.once('end', function(){ | ||
testQueue.eclient.once('end', function () { | ||
done(); | ||
@@ -129,3 +129,3 @@ }); | ||
testQueue.on('completed', function(){ | ||
testQueue.on('completed', function () { | ||
testQueue.close().then(done); | ||
@@ -160,2 +160,20 @@ }); | ||
it('should create a queue with a redis connection string', function (done) { | ||
var queue = new Queue('connstring', 'redis://127.0.0.1:6379'); | ||
queue.once('ready', function () { | ||
expect(queue.client.connection_options.host).to.be('127.0.0.1'); | ||
expect(queue.bclient.connection_options.host).to.be('127.0.0.1'); | ||
expect(queue.client.connection_options.port).to.be(6379); | ||
expect(queue.bclient.connection_options.port).to.be(6379); | ||
expect(queue.client.selected_db).to.be(0); | ||
expect(queue.bclient.selected_db).to.be(0); | ||
queue.close().then(done); | ||
}); | ||
}); | ||
it('creates a queue using the supplied redis DB', function (done) { | ||
@@ -203,3 +221,3 @@ var queue = new Queue('custom', { redis: { DB: 1 } }); | ||
}); | ||
}).then(function(){ | ||
}).then(function () { | ||
return queue.close(); | ||
@@ -213,7 +231,7 @@ }); | ||
beforeEach(function(){ | ||
beforeEach(function () { | ||
var client = redis.createClient(); | ||
return client.flushdbAsync().then(function(){ | ||
return client.flushdbAsync().then(function () { | ||
return utils.newQueue(); | ||
}).then(function(_queue){ | ||
}).then(function (_queue) { | ||
queue = _queue; | ||
@@ -223,3 +241,3 @@ }); | ||
afterEach(function(){ | ||
afterEach(function () { | ||
return utils.cleanupQueues(); | ||
@@ -241,14 +259,12 @@ }); | ||
it('process a lifo queue', function (done) { | ||
this.timeout(12000); | ||
this.timeout(3000); | ||
var currentValue = 0, first = true; | ||
utils.newQueue('test lifo').then(function(queue2){ | ||
utils.newQueue('test lifo').then(function (queue2) { | ||
queue2.process(function (job, jobDone) { | ||
// Catching the job before the pause | ||
expect(job.data.count).to.be.equal(currentValue--); | ||
if(first) { | ||
jobDone(); | ||
if (first) { | ||
first = false; | ||
return jobDone(); | ||
} | ||
jobDone(); | ||
if(currentValue === 0) { | ||
} else if (currentValue === 0) { | ||
done(); | ||
@@ -258,13 +274,10 @@ } | ||
// Add a job to pend proccessing | ||
queue2.add({ 'count': 0 }).then(function () { | ||
queue2.pause().then(function () { | ||
// Add a series of jobs in a predictable order | ||
var fn = function (cb) { | ||
queue2.add({ 'count': ++currentValue }, { 'lifo': true }).then(cb); | ||
}; | ||
fn(fn(fn(fn(function () { | ||
queue2.resume(); | ||
})))); | ||
}); | ||
queue2.pause().then(function () { | ||
// Add a series of jobs in a predictable order | ||
var fn = function (cb) { | ||
queue2.add({ 'count': ++currentValue }, { 'lifo': true }).then(cb); | ||
}; | ||
fn(fn(fn(fn(function () { | ||
queue2.resume(); | ||
})))); | ||
}); | ||
@@ -283,3 +296,3 @@ }); | ||
jobDone(); | ||
if(counter === maxJobs) { | ||
if (counter === maxJobs) { | ||
done(); | ||
@@ -290,3 +303,3 @@ } | ||
for(var i = 1; i <= maxJobs; i++) { | ||
for (var i = 1; i <= maxJobs; i++) { | ||
queue.add({ foo: 'bar', num: i }); | ||
@@ -359,3 +372,3 @@ } | ||
expect(job.data.foo).to.be.equal('bar'); | ||
return Promise.delay(250).then(function(){ | ||
return Promise.delay(250).then(function () { | ||
return 'my data'; | ||
@@ -413,3 +426,3 @@ }); | ||
this.timeout(12000); | ||
utils.newQueue('test queue stalled').then(function(queueStalled){ | ||
utils.newQueue('test queue stalled').then(function (queueStalled) { | ||
queueStalled.LOCK_RENEW_TIME = 10; | ||
@@ -427,5 +440,5 @@ var jobs = [ | ||
return queueStalled.close(true).then(function(){ | ||
return new Promise(function(resolve) { | ||
utils.newQueue('test queue stalled').then(function(queue2){ | ||
return queueStalled.close(true).then(function () { | ||
return new Promise(function (resolve) { | ||
utils.newQueue('test queue stalled').then(function (queue2) { | ||
queue2.LOCK_RENEW_TIME = 100; | ||
@@ -458,3 +471,3 @@ var doneAfterFour = _.after(4, function () { | ||
it('processes jobs that were added before the queue backend started', function () { | ||
utils.newQueue('test queue added before').then(function(queueStalled){ | ||
utils.newQueue('test queue added before').then(function (queueStalled) { | ||
queueStalled.LOCK_RENEW_TIME = 10; | ||
@@ -471,3 +484,3 @@ var jobs = [ | ||
.then(function () { | ||
utils.newQueue('test queue added before').then(function(queue2){ | ||
utils.newQueue('test queue added before').then(function (queue2) { | ||
queue2.process(function (job, jobDone) { | ||
@@ -494,3 +507,3 @@ jobDone(); | ||
for(var i = 0; i < NUM_QUEUES; i++) { | ||
for (var i = 0; i < NUM_QUEUES; i++) { | ||
var queueStalled2 = new Queue('test queue stalled 2', 6379, '127.0.0.1'); | ||
@@ -500,3 +513,3 @@ stalledQueues.push(queueStalled2); | ||
for(var j = 0; j < NUM_JOBS_PER_QUEUE; j++) { | ||
for (var j = 0; j < NUM_JOBS_PER_QUEUE; j++) { | ||
jobs.push(queueStalled2.add({ job: j })); | ||
@@ -514,3 +527,3 @@ } | ||
processed++; | ||
if(processed === stalledQueues.length) { | ||
if (processed === stalledQueues.length) { | ||
setTimeout(function () { | ||
@@ -525,3 +538,3 @@ var queue2 = new Queue('test queue stalled 2', 6379, '127.0.0.1'); | ||
counter++; | ||
if(counter === NUM_QUEUES * NUM_JOBS_PER_QUEUE) { | ||
if (counter === NUM_QUEUES * NUM_JOBS_PER_QUEUE) { | ||
queue2.close().then(done); | ||
@@ -536,3 +549,3 @@ } | ||
var processes = []; | ||
for(var k = 0; k < stalledQueues.length; k++) { | ||
for (var k = 0; k < stalledQueues.length; k++) { | ||
processes.push(stalledQueues[k].process(procFn)); | ||
@@ -547,3 +560,8 @@ } | ||
var err = null; | ||
var anotherQueue; | ||
queue.on('completed', function () { | ||
utils.cleanupQueue(anotherQueue).then(done.bind(null, err)); | ||
}); | ||
queue.add({ foo: 'bar' }).then(function (addedJob) { | ||
@@ -553,3 +571,3 @@ queue.process(function (job, jobDone) { | ||
if(addedJob.jobId !== job.jobId) { | ||
if (addedJob.jobId !== job.jobId) { | ||
err = new Error('Processed job id does not match that of added job'); | ||
@@ -559,4 +577,6 @@ } | ||
}); | ||
setTimeout(function () { | ||
utils.newQueue().then(function(anotherQueue){ | ||
utils.newQueue().then(function (_anotherQueue) { | ||
anotherQueue = _anotherQueue; | ||
setTimeout(function () { | ||
anotherQueue.process(function (job, jobDone) { | ||
@@ -566,8 +586,4 @@ err = new Error('The second queue should not have received a job to process'); | ||
}); | ||
queue.on('completed', function () { | ||
utils.cleanupQueue(anotherQueue).then(done.bind(null, err)); | ||
}); | ||
}); | ||
}, 50); | ||
}, 50); | ||
}); | ||
}); | ||
@@ -581,3 +597,3 @@ }); | ||
var collect = _.after(2, function(){ | ||
var collect = _.after(2, function () { | ||
queue2.close().then(done); | ||
@@ -652,3 +668,3 @@ }); | ||
it('process a job that returns data with a circular dependency', function(done){ | ||
it('process a job that returns data with a circular dependency', function (done) { | ||
queue.on('error', function (err) { | ||
@@ -721,3 +737,3 @@ done(err); | ||
called++; | ||
if(called % 2 !== 0) { | ||
if (called % 2 !== 0) { | ||
throw new Error('Not even!'); | ||
@@ -751,3 +767,3 @@ } | ||
for(var i = 1; i <= maxJobs; i++) { | ||
for (var i = 1; i <= maxJobs; i++) { | ||
added.push(queue.add({ foo: 'bar', num: i })); | ||
@@ -779,3 +795,3 @@ } | ||
describe('.pause', function () { | ||
beforeEach(function(){ | ||
beforeEach(function () { | ||
var client = redis.createClient(); | ||
@@ -788,3 +804,3 @@ return client.flushdbAsync(); | ||
utils.newQueue().then(function(queue){ | ||
utils.newQueue().then(function (queue) { | ||
var resultPromise = new Promise(function (resolve) { | ||
@@ -796,3 +812,3 @@ queue.process(function (job, jobDone) { | ||
counter--; | ||
if(counter === 0) { | ||
if (counter === 0) { | ||
resolve(queue.close()); | ||
@@ -818,3 +834,3 @@ } | ||
utils.newQueue().then(function(queue){ | ||
utils.newQueue().then(function (queue) { | ||
queue.process(function (job, jobDone) { | ||
@@ -825,7 +841,7 @@ expect(ispaused).to.be(false); | ||
if(first) { | ||
if (first) { | ||
first = false; | ||
ispaused = true; | ||
queue.pause(); | ||
}else{ | ||
} else { | ||
expect(isresumed).to.be(true); | ||
@@ -850,3 +866,3 @@ queue.close().then(done); | ||
it('should pause the queue locally', function(testDone){ | ||
it('should pause the queue locally', function (testDone) { | ||
var counter = 2; | ||
@@ -856,19 +872,19 @@ | ||
queue.pause(true /* Local */).then(function(){ | ||
queue.pause(true /* Local */).then(function () { | ||
// Add the worker after the queue is in paused mode since the normal behavior is to pause | ||
// it after the current lock expires. This way, we can ensure there isn't a lock already | ||
// to test that pausing behavior works. | ||
queue.process(function(job, done){ | ||
queue.process(function (job, done) { | ||
expect(queue.paused).not.to.be.ok(); | ||
done(); | ||
counter--; | ||
if(counter === 0){ | ||
if (counter === 0) { | ||
queue.close().then(testDone); | ||
} | ||
}); | ||
}).then(function(){ | ||
}).then(function () { | ||
return queue.add({ foo: 'paused' }); | ||
}).then(function(){ | ||
}).then(function () { | ||
return queue.add({ foo: 'paused' }); | ||
}).then(function(){ | ||
}).then(function () { | ||
expect(counter).to.be(2); | ||
@@ -888,8 +904,8 @@ expect(queue.paused).to.be.ok(); // Parameter should exist. | ||
var jobs = []; | ||
for(var i = 0; i < 10; i++) { | ||
for (var i = 0; i < 10; i++) { | ||
jobs.push(queue.add(i)); | ||
} | ||
Promise.all(jobs).then(function() { | ||
queue.pause(true).then(function() { | ||
var active = queue.getJobCountByTypes(['active']).then(function(count) { | ||
Promise.all(jobs).then(function () { | ||
queue.pause(true).then(function () { | ||
var active = queue.getJobCountByTypes(['active']).then(function (count) { | ||
expect(count).to.be(0); | ||
@@ -901,3 +917,3 @@ expect(queue.paused).to.be.ok(); | ||
// One job from the 10 posted above will be processed, so we expect 9 jobs pending | ||
var paused = queue.getJobCountByTypes(['wait', 'delayed']).then(function(count) { | ||
var paused = queue.getJobCountByTypes(['wait', 'delayed']).then(function (count) { | ||
expect(count).to.be(9); | ||
@@ -909,6 +925,6 @@ return null; | ||
}).then(function() { | ||
}).then(function () { | ||
return queue.add({}); | ||
}).then(function() { | ||
var active = queue.getJobCountByTypes(['active']).then(function(count) { | ||
}).then(function () { | ||
var active = queue.getJobCountByTypes(['active']).then(function (count) { | ||
expect(count).to.be(0); | ||
@@ -918,3 +934,3 @@ return null; | ||
var paused = queue.getJobCountByTypes(['wait', 'delayed']).then(function(count) { | ||
var paused = queue.getJobCountByTypes(['wait', 'delayed']).then(function (count) { | ||
expect(count).to.be(10); | ||
@@ -925,3 +941,3 @@ return null; | ||
return Promise.all([active, paused]); | ||
}).then(function() { | ||
}).then(function () { | ||
return queue.close().then(done, done); | ||
@@ -965,3 +981,3 @@ }); | ||
beforeEach(function(){ | ||
beforeEach(function () { | ||
var client = redis.createClient(); | ||
@@ -1016,7 +1032,7 @@ return client.flushdbAsync(); | ||
queue.on('failed', function(err){ | ||
queue.on('failed', function (err) { | ||
done(err); | ||
}); | ||
queue.on('ready', function() { | ||
queue.on('ready', function () { | ||
@@ -1027,3 +1043,3 @@ queue.process(function (job, jobDone) { | ||
jobDone(); | ||
if(order === 10) { | ||
if (order === 10) { | ||
queue.close().then(done, done); | ||
@@ -1059,3 +1075,3 @@ } | ||
if(order === 4) { | ||
if (order === 4) { | ||
queue.close().then(done, done); | ||
@@ -1102,3 +1118,3 @@ } | ||
if(order === 12) { | ||
if (order === 12) { | ||
queue.close().then(done, done); | ||
@@ -1110,7 +1126,7 @@ } | ||
queue.on('ready', function() { | ||
queue.on('ready', function () { | ||
var now = Date.now(); | ||
var _promises = []; | ||
var _i = 1; | ||
for(_i; _i <= 12; _i++){ | ||
for (_i; _i <= 12; _i++) { | ||
_promises.push(queue.add({ order: _i }, { | ||
@@ -1131,3 +1147,3 @@ delay: 1000, | ||
beforeEach(function(){ | ||
beforeEach(function () { | ||
var client = redis.createClient(); | ||
@@ -1138,3 +1154,3 @@ queue = utils.buildQueue(); | ||
afterEach(function(){ | ||
afterEach(function () { | ||
return queue.close(); | ||
@@ -1203,3 +1219,3 @@ }); | ||
if(++i === 4){ | ||
if (++i === 4) { | ||
queue.pause().then(function () { | ||
@@ -1215,3 +1231,3 @@ Promise.delay(500).then(function () { // Wait for all the active jobs to finalize. | ||
// They had a bug in pause() with this special case. | ||
if(i % 3 === 0){ | ||
if (i % 3 === 0) { | ||
error = new Error(); | ||
@@ -1243,12 +1259,12 @@ } | ||
describe('Retries and backoffs', function() { | ||
describe('Retries and backoffs', function () { | ||
var queue; | ||
afterEach(function(){ | ||
afterEach(function () { | ||
return queue.close(); | ||
}); | ||
it('should automatically retry a failed job if attempts is bigger than 1', function(done) { | ||
it('should automatically retry a failed job if attempts is bigger than 1', function (done) { | ||
queue = utils.buildQueue('test retries and backoffs'); | ||
queue.on('ready', function() { | ||
queue.on('ready', function () { | ||
@@ -1258,3 +1274,3 @@ var tries = 0; | ||
tries++; | ||
if(job.attemptsMade < 2){ | ||
if (job.attemptsMade < 2) { | ||
throw new Error('Not yet!'); | ||
@@ -1270,3 +1286,3 @@ } | ||
}); | ||
queue.on('completed', function() { | ||
queue.on('completed', function () { | ||
done(); | ||
@@ -1276,9 +1292,9 @@ }); | ||
it('should not retry a failed job more than the number of given attempts times', function(done) { | ||
it('should not retry a failed job more than the number of given attempts times', function (done) { | ||
queue = utils.buildQueue('test retries and backoffs'); | ||
var tries = 0; | ||
queue.on('ready', function() { | ||
queue.on('ready', function () { | ||
queue.process(function (job, jobDone) { | ||
tries++; | ||
if(job.attemptsMade < 3){ | ||
if (job.attemptsMade < 3) { | ||
throw new Error('Not yet!'); | ||
@@ -1294,7 +1310,7 @@ } | ||
}); | ||
queue.on('completed', function() { | ||
queue.on('completed', function () { | ||
done(new Error('Failed job was retried more than it should be!')); | ||
}); | ||
queue.on('failed', function() { | ||
if(tries === 3){ | ||
queue.on('failed', function () { | ||
if (tries === 3) { | ||
done(); | ||
@@ -1305,9 +1321,9 @@ } | ||
it('should retry a job after a delay if a fixed backoff is given', function(done) { | ||
it('should retry a job after a delay if a fixed backoff is given', function (done) { | ||
this.timeout(12000); | ||
queue = utils.buildQueue('test retries and backoffs'); | ||
var start; | ||
queue.on('ready', function() { | ||
queue.on('ready', function () { | ||
queue.process(function (job, jobDone) { | ||
if(job.attemptsMade < 2){ | ||
if (job.attemptsMade < 2) { | ||
throw new Error('Not yet!'); | ||
@@ -1324,3 +1340,3 @@ } | ||
}); | ||
queue.on('completed', function() { | ||
queue.on('completed', function () { | ||
var elapse = Date.now() - start; | ||
@@ -1332,9 +1348,9 @@ expect(elapse).to.be.greaterThan(2000); | ||
it('should retry a job after a delay if an exponential backoff is given', function(done) { | ||
it('should retry a job after a delay if an exponential backoff is given', function (done) { | ||
this.timeout(12000); | ||
queue = utils.buildQueue('test retries and backoffs'); | ||
var start; | ||
queue.on('ready', function() { | ||
queue.on('ready', function () { | ||
queue.process(function (job, jobDone) { | ||
if(job.attemptsMade < 2){ | ||
if (job.attemptsMade < 2) { | ||
throw new Error('Not yet!'); | ||
@@ -1354,3 +1370,3 @@ } | ||
}); | ||
queue.on('completed', function() { | ||
queue.on('completed', function () { | ||
var elapse = Date.now() - start; | ||
@@ -1363,2 +1379,120 @@ var expected = 1000 * (Math.pow(2, 2) - 1); | ||
it('should not retry a job that has been removed', function (done) { | ||
queue = utils.buildQueue('retry a removed job'); | ||
queue.on('ready', function () { | ||
var attempts = 0; | ||
queue.process(function (job, jobDone) { | ||
if (attempts === 0) { | ||
attempts++; | ||
throw new Error('failed'); | ||
} else { | ||
jobDone(); | ||
} | ||
}); | ||
queue.add({ foo: 'bar' }); | ||
}); | ||
var failedHandler = _.once(function (job, err) { | ||
expect(job.data.foo).to.equal('bar'); | ||
expect(err.message).to.equal('failed'); | ||
job.retry().delay(100) | ||
.then(function () { | ||
return queue.getCompletedCount().then(function (count) { | ||
return expect(count).to.equal(1); | ||
}); | ||
}) | ||
.then(function () { | ||
return queue.clean(0).then(function () { | ||
return job.retry().catch(function (err) { | ||
expect(err.message).to.equal('Couldn\'t retry job: The job doesn\'t exist'); | ||
}); | ||
}); | ||
}) | ||
.then(function () { | ||
return Promise.all([ | ||
queue.getCompletedCount().then(function (count) { | ||
return expect(count).to.equal(0); | ||
}), | ||
queue.getFailedCount().then(function (count) { | ||
return expect(count).to.equal(0); | ||
}) | ||
]); | ||
}) | ||
.then(function () { | ||
done(); | ||
}, done); | ||
}); | ||
queue.on('failed', failedHandler); | ||
}); | ||
it('should not retry a job that has been retried already', function (done) { | ||
queue = utils.buildQueue('retry already retried job'); | ||
queue.on('ready', function () { | ||
var attempts = 0; | ||
queue.process(function (job, jobDone) { | ||
if (attempts === 0) { | ||
attempts++; | ||
throw new Error('failed'); | ||
} else { | ||
jobDone(); | ||
} | ||
}); | ||
queue.add({ foo: 'bar' }); | ||
}); | ||
var failedHandler = _.once(function (job, err) { | ||
expect(job.data.foo).to.equal('bar'); | ||
expect(err.message).to.equal('failed'); | ||
job.retry().delay(100) | ||
.then(function () { | ||
return queue.getCompletedCount().then(function (count) { | ||
return expect(count).to.equal(1); | ||
}); | ||
}) | ||
.then(function () { | ||
return job.retry().catch(function (err) { | ||
expect(err.message).to.equal('Couldn\'t retry job: The job has been already retried or has not failed'); | ||
}); | ||
}) | ||
.then(function () { | ||
return Promise.all([ | ||
queue.getCompletedCount().then(function (count) { | ||
return expect(count).to.equal(1); | ||
}), | ||
queue.getFailedCount().then(function (count) { | ||
return expect(count).to.equal(0); | ||
}) | ||
]); | ||
}) | ||
.then(function () { | ||
done(); | ||
}, done); | ||
}); | ||
queue.on('failed', failedHandler); | ||
}); | ||
it('should not retry a job that is locked', function (done) { | ||
queue = utils.buildQueue('retry a locked job'); | ||
var addedHandler = _.once(function (job) { | ||
expect(job.data.foo).to.equal('bar'); | ||
job.retry().catch(function (err) { | ||
expect(err.message).to.equal('Couldn\'t retry job: The job has been already retried or has not failed'); | ||
return null; | ||
}).then(done, done); | ||
}); | ||
queue.on('ready', function () { | ||
queue.process(function (job, jobDone) { | ||
return Promise.delay(200).then(jobDone); | ||
}); | ||
queue.add({ foo: 'bar' }).then(addedHandler); | ||
}); | ||
}); | ||
}); | ||
@@ -1369,3 +1503,3 @@ | ||
beforeEach(function(){ | ||
beforeEach(function () { | ||
queue = utils.buildQueue(); | ||
@@ -1375,3 +1509,3 @@ return queue.clean(1000); | ||
afterEach(function(){ | ||
afterEach(function () { | ||
return queue.close(); | ||
@@ -1426,3 +1560,3 @@ }); | ||
if(counter === 0){ | ||
if (counter === 0) { | ||
queue.getCompleted().then(function (jobs) { | ||
@@ -1451,3 +1585,3 @@ expect(jobs).to.be.a('array'); | ||
if(counter === 0){ | ||
if (counter === 0) { | ||
queue.getFailed().then(function (jobs) { | ||
@@ -1486,7 +1620,7 @@ expect(jobs).to.be.a('array'); | ||
describe('getJobs', function() { | ||
describe('getJobs', function () { | ||
this.timeout(12000); | ||
var queue; | ||
beforeEach(function(){ | ||
beforeEach(function () { | ||
queue = utils.buildQueue(); | ||
@@ -1496,13 +1630,13 @@ return queue.clean(1000); | ||
afterEach(function(){ | ||
afterEach(function () { | ||
return queue.close(); | ||
}); | ||
it('should return all completed jobs when not setting start/end', function(done) { | ||
queue.process(function(job, completed) { | ||
it('should return all completed jobs when not setting start/end', function (done) { | ||
queue.process(function (job, completed) { | ||
completed(); | ||
}); | ||
queue.on('completed', _.after(3, function() { | ||
queue.getJobs('completed', 'SET').then(function(jobs) { | ||
queue.on('completed', _.after(3, function () { | ||
queue.getJobs('completed', 'SET').then(function (jobs) { | ||
expect(jobs).to.be.an(Array); | ||
@@ -1519,9 +1653,9 @@ expect(jobs).to.have.length(3); | ||
it('should return all failed jobs when not setting start/end', function(done) { | ||
queue.process(function(job, completed) { | ||
it('should return all failed jobs when not setting start/end', function (done) { | ||
queue.process(function (job, completed) { | ||
completed(new Error('error')); | ||
}); | ||
queue.on('failed', _.after(3, function() { | ||
queue.getJobs('failed', 'SET').then(function(jobs) { | ||
queue.on('failed', _.after(3, function () { | ||
queue.getJobs('failed', 'SET').then(function (jobs) { | ||
expect(jobs).to.be.an(Array); | ||
@@ -1538,9 +1672,9 @@ expect(jobs).to.have.length(3); | ||
it('should return subset of jobs when setting positive range', function(done) { | ||
queue.process(function(job, completed) { | ||
it('should return subset of jobs when setting positive range', function (done) { | ||
queue.process(function (job, completed) { | ||
completed(); | ||
}); | ||
queue.on('completed', _.after(3, function() { | ||
queue.getJobs('completed', 'SET', 1, 2).then(function(jobs) { | ||
queue.on('completed', _.after(3, function () { | ||
queue.getJobs('completed', 'SET', 1, 2).then(function (jobs) { | ||
expect(jobs).to.be.an(Array); | ||
@@ -1559,9 +1693,9 @@ expect(jobs).to.have.length(2); | ||
it('should return subset of jobs when setting a negative range', function(done) { | ||
queue.process(function(job, completed) { | ||
it('should return subset of jobs when setting a negative range', function (done) { | ||
queue.process(function (job, completed) { | ||
completed(); | ||
}); | ||
queue.on('completed', _.after(3, function() { | ||
queue.getJobs('completed', 'SET', -3, -1).then(function(jobs) { | ||
queue.on('completed', _.after(3, function () { | ||
queue.getJobs('completed', 'SET', -3, -1).then(function (jobs) { | ||
expect(jobs).to.be.an(Array); | ||
@@ -1581,9 +1715,9 @@ expect(jobs).to.have.length(3); | ||
it('should return subset of jobs when range overflows', function(done) { | ||
queue.process(function(job, completed) { | ||
it('should return subset of jobs when range overflows', function (done) { | ||
queue.process(function (job, completed) { | ||
completed(); | ||
}); | ||
queue.on('completed', _.after(3, function() { | ||
queue.getJobs('completed', 'SET', -300, 99999).then(function(jobs) { | ||
queue.on('completed', _.after(3, function () { | ||
queue.getJobs('completed', 'SET', -300, 99999).then(function (jobs) { | ||
expect(jobs).to.be.an(Array); | ||
@@ -1612,7 +1746,7 @@ expect(jobs).to.have.length(3); | ||
afterEach(function(){ | ||
afterEach(function () { | ||
return queue.close(); | ||
}); | ||
it('should reject the cleaner with no grace', function(done){ | ||
it('should reject the cleaner with no grace', function (done) { | ||
queue.clean().then(function () { | ||
@@ -1648,4 +1782,4 @@ done(new Error('Promise should not resolve')); | ||
it('should clean two jobs from the queue', function (done) { | ||
queue.add({some: 'data'}); | ||
queue.add({some: 'data'}); | ||
queue.add({ some: 'data' }); | ||
queue.add({ some: 'data' }); | ||
queue.process(function (job, jobDone) { | ||
@@ -1668,6 +1802,6 @@ jobDone(); | ||
}); | ||
queue.add({some: 'data'}); | ||
queue.add({some: 'data'}); | ||
queue.add({ some: 'data' }); | ||
queue.add({ some: 'data' }); | ||
Promise.delay(200).then(function () { | ||
queue.add({some: 'data'}); | ||
queue.add({ some: 'data' }); | ||
queue.clean(100); | ||
@@ -1685,4 +1819,4 @@ }).delay(100).then(function () { | ||
it('should clean all failed jobs', function (done) { | ||
queue.add({some: 'data'}); | ||
queue.add({some: 'data'}); | ||
queue.add({ some: 'data' }); | ||
queue.add({ some: 'data' }); | ||
queue.process(function (job, jobDone) { | ||
@@ -1696,3 +1830,3 @@ jobDone(new Error('It failed')); | ||
return queue.count(); | ||
}).then(function(len) { | ||
}).then(function (len) { | ||
expect(len).to.be(0); | ||
@@ -1704,4 +1838,4 @@ done(); | ||
it('should clean all waiting jobs', function (done) { | ||
queue.add({some: 'data'}); | ||
queue.add({some: 'data'}); | ||
queue.add({ some: 'data' }); | ||
queue.add({ some: 'data' }); | ||
Promise.delay(100).then(function () { | ||
@@ -1712,3 +1846,3 @@ return queue.clean(0, 'wait'); | ||
return queue.count(); | ||
}).then(function(len) { | ||
}).then(function (len) { | ||
expect(len).to.be(0); | ||
@@ -1720,4 +1854,4 @@ done(); | ||
it('should clean all delayed jobs', function (done) { | ||
queue.add({some: 'data'}, { delay: 5000 }); | ||
queue.add({some: 'data'}, { delay: 5000 }); | ||
queue.add({ some: 'data' }, { delay: 5000 }); | ||
queue.add({ some: 'data' }, { delay: 5000 }); | ||
Promise.delay(100).then(function () { | ||
@@ -1728,3 +1862,3 @@ return queue.clean(0, 'delayed'); | ||
return queue.count(); | ||
}).then(function(len) { | ||
}).then(function (len) { | ||
expect(len).to.be(0); | ||
@@ -1736,5 +1870,5 @@ done(); | ||
it('should clean the number of jobs requested', function (done) { | ||
queue.add({some: 'data'}); | ||
queue.add({some: 'data'}); | ||
queue.add({some: 'data'}); | ||
queue.add({ some: 'data' }); | ||
queue.add({ some: 'data' }); | ||
queue.add({ some: 'data' }); | ||
Promise.delay(100).then(function () { | ||
@@ -1745,3 +1879,3 @@ return queue.clean(0, 'wait', 1); | ||
return queue.count(); | ||
}).then(function(len) { | ||
}).then(function (len) { | ||
expect(len).to.be(2); | ||
@@ -1755,4 +1889,4 @@ done(); | ||
queue.add({some: 'data'}); | ||
queue.add({some: 'data'}); | ||
queue.add({ some: 'data' }); | ||
queue.add({ some: 'data' }); | ||
queue.process(function (job, jobDone) { | ||
@@ -1763,6 +1897,6 @@ jobDone(new Error('It failed')); | ||
Promise.delay(100).then(function () { | ||
return new Promise(function(resolve) { | ||
return new Promise(function (resolve) { | ||
client.hdel('bull:' + queue.name + ':1', 'timestamp', resolve); | ||
}); | ||
}).then(function() { | ||
}).then(function () { | ||
return queue.clean(0, 'failed'); | ||
@@ -1772,3 +1906,3 @@ }).then(function (jobs) { | ||
return queue.getFailed(); | ||
}).then(function(failed) { | ||
}).then(function (failed) { | ||
expect(failed.length).to.be(0); | ||
@@ -1779,2 +1913,2 @@ done(); | ||
}); | ||
}); | ||
}); |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Native code
Supply chain riskContains native code (e.g., compiled binaries or shared libraries). Including native code can obscure malicious behavior.
Found 1 instance in 1 package
5625
737
210870
7
26
12
+ Addeddisturbed@^1.0.3
+ Addeddisturbed@1.0.6(transitive)
Updatedbluebird@^3.4.6
Updatedlodash@^4.16.6
Updatedredis@^2.6.3
Updatedsemver@^5.3.0