Comparing version 1.1.3 to 2.0.0
@@ -0,1 +1,9 @@ | ||
v.2.0.0 | ||
======= | ||
- Changed redis module to ioredis fixing many issues along the way, see changes. | ||
[Changes](https://github.com/OptimalBits/bull/compare/v1.1.3...v2.0.0) | ||
v.1.1.3 | ||
@@ -2,0 +10,0 @@ ======= |
@@ -8,3 +8,2 @@ /*eslint-env node */ | ||
var debuglog = require('debuglog')('bull'); | ||
var uuid = require('node-uuid'); | ||
@@ -63,7 +62,7 @@ /** | ||
} | ||
return queue.client.hgetallAsync(queue.toKey(jobId)).then(function(jobData){ | ||
if(jobData){ | ||
return queue.client.hgetall(queue.toKey(jobId)).then(function(jobData){ | ||
if(!_.isEmpty(jobData)){ | ||
return Job.fromData(queue, jobId, jobData); | ||
}else{ | ||
return jobData; | ||
return null; | ||
} | ||
@@ -90,3 +89,3 @@ }); | ||
var _this = this; | ||
return this.queue.client.hsetAsync(this.queue.toKey(this.jobId), 'progress', progress).then(function(){ | ||
return this.queue.client.hset(this.queue.toKey(this.jobId), 'progress', progress).then(function(){ | ||
_this.queue.distEmit('progress', _this.toJSON(), progress); | ||
@@ -103,3 +102,3 @@ }); | ||
Job.prototype.toJSON = function(){ | ||
var opts = this.opts || {}; | ||
var opts = _.extend({}, this.opts || {}); | ||
opts.jobId = this.jobId; | ||
@@ -130,5 +129,8 @@ return { | ||
*/ | ||
Job.prototype.takeLock = function(token, renew, ensureActive){ | ||
return scripts.takeLock(this.queue, this, token, renew, ensureActive).then(function(res){ | ||
return res === 1; // Indicates successful lock. | ||
Job.prototype.takeLock = function(renew, ensureActive){ | ||
var _this = this; | ||
return scripts.takeLock(this.queue, this, renew, ensureActive) | ||
.then(function(lock) { | ||
if (lock) _this.lock = lock; | ||
return lock || false; | ||
}); | ||
@@ -140,4 +142,4 @@ }; | ||
*/ | ||
Job.prototype.renewLock = function(token){ | ||
return this.takeLock(token, true /* Renew */); | ||
Job.prototype.renewLock = function(){ | ||
return this.takeLock(true /* Renew */); | ||
}; | ||
@@ -148,4 +150,6 @@ | ||
*/ | ||
Job.prototype.releaseLock = function(token){ | ||
return scripts.releaseLock(this, token); | ||
Job.prototype.releaseLock = function(){ | ||
var _this = this; | ||
return scripts.releaseLock(this) | ||
.then(function() { _this.lock = null; }); | ||
}; | ||
@@ -165,8 +169,8 @@ | ||
Job.prototype.moveToCompleted = function(returnValue, token){ | ||
Job.prototype.moveToCompleted = function(returnValue){ | ||
this.returnvalue = returnValue || 0; | ||
return scripts.moveToCompleted(this, token || 0, this.opts.removeOnComplete); | ||
return scripts.moveToCompleted(this || 0, this.opts.removeOnComplete); | ||
}; | ||
Job.prototype.move = function(src, target, token, returnValue){ | ||
Job.prototype.move = function(src, target, returnValue){ | ||
if(target === 'completed'){ | ||
@@ -178,3 +182,3 @@ this.returnvalue = returnValue || 0; | ||
} | ||
return scripts.move(this, token || 0, src, target); | ||
return scripts.move(this || 0, src, target); | ||
} | ||
@@ -223,3 +227,3 @@ | ||
return queue.client.evalAsync( | ||
return queue.client.eval( | ||
script, | ||
@@ -269,3 +273,3 @@ keys.length, | ||
return this.queue.client | ||
.zrankAsync(this.queue.toKey('delayed'), this.jobId).then(function(rank) { | ||
.zrank(this.queue.toKey('delayed'), this.jobId).then(function(rank) { | ||
return rank !== null; | ||
@@ -319,9 +323,7 @@ }); | ||
*/ | ||
Job.prototype.remove = function(token){ | ||
Job.prototype.remove = function(){ | ||
var queue = this.queue; | ||
var job = this; | ||
var token = token || uuid(); | ||
return job.takeLock(token).then(function(lock) { | ||
return job.takeLock().then(function(lock) { | ||
if (!lock) { | ||
@@ -335,3 +337,3 @@ throw new Error('Could not get lock for job: ' + job.jobId + '. Cannot remove job.'); | ||
.finally(function () { | ||
return job.releaseLock(token); | ||
return job.releaseLock(); | ||
}); | ||
@@ -415,3 +417,3 @@ }); | ||
return this.queue.client | ||
.sismemberAsync(this.queue.toKey(list), this.jobId).then(function(isMember){ | ||
.sismember(this.queue.toKey(list), this.jobId).then(function(isMember){ | ||
return isMember === 1; | ||
@@ -422,18 +424,3 @@ }); | ||
Job.prototype._isInList = function(list) { | ||
var script = [ | ||
'local function item_in_list (list, item)', | ||
' for _, v in pairs(list) do', | ||
' if v == item then', | ||
' return 1', | ||
' end', | ||
' end', | ||
' return nil', | ||
'end', | ||
'local items = redis.call("LRANGE", KEYS[1], 0, -1)', | ||
'return item_in_list(items, ARGV[1])' | ||
].join('\n'); | ||
return this.queue.client.evalAsync(script, 1, this.queue.toKey(list), this.jobId).then(function(result){ | ||
return result === 1; | ||
}); | ||
return scripts.isJobInList(this.queue.client, this.queue.toKey(list), this.jobId); | ||
}; | ||
@@ -484,3 +471,3 @@ | ||
return queue.client.evalAsync( | ||
return queue.client.eval( | ||
script, | ||
@@ -514,3 +501,3 @@ keys.length, | ||
return this.queue.client.hmsetAsync(this.queue.toKey(this.jobId), params); | ||
return this.queue.client.hmset(this.queue.toKey(this.jobId), params); | ||
}; | ||
@@ -517,0 +504,0 @@ |
172
lib/queue.js
/*eslint-env node */ | ||
'use strict'; | ||
var redis = require('redis'); | ||
var redis = require('ioredis'); | ||
var Disturbed = require('disturbed'); | ||
@@ -14,8 +14,5 @@ var util = require('util'); | ||
var Promise = require('bluebird'); | ||
var uuid = require('node-uuid'); | ||
var semver = require('semver'); | ||
var debuglog = require('debuglog')('bull'); | ||
Promise.promisifyAll(redis.RedisClient.prototype); | ||
Promise.promisifyAll(redis.Multi.prototype); | ||
@@ -62,2 +59,6 @@ /** | ||
var REDLOCK_DRIFT_FACTOR = 0.01; | ||
var REDLOCK_RETRY_COUNT = 0; | ||
var REDLOCK_RETRY_DELAY = 200; | ||
var Queue = function Queue(name, redisPort, redisHost, redisOptions){ | ||
@@ -74,3 +75,3 @@ if(!(this instanceof Queue)){ | ||
redisOptions = redisOpts.opts || {}; | ||
redisOptions.db = redisOpts.DB; | ||
redisOptions.db = redisOpts.DB || redisOpts.DB; | ||
} else if(parseInt(redisPort) == redisPort) { | ||
@@ -96,3 +97,2 @@ redisPort = parseInt(redisPort); | ||
redisOptions = redisOptions || {}; | ||
var redisDB = redisOptions.db || 0; | ||
@@ -131,2 +131,16 @@ function createClient() { | ||
// | ||
// Keep track of cluster clients for redlock | ||
// | ||
this.clients = [this.client]; | ||
if (redisOptions.clients) { | ||
this.clients.push.apply(this.clients, redisOptions.clients); | ||
} | ||
this.redlock = { | ||
driftFactor: REDLOCK_DRIFT_FACTOR, | ||
retryCount: REDLOCK_RETRY_COUNT, | ||
retryDelay: REDLOCK_RETRY_DELAY | ||
}; | ||
_.extend(this.redlock, redisOptions.redlock || {}); | ||
// | ||
// Create blocking client (used to wait for jobs) | ||
@@ -144,3 +158,2 @@ // | ||
this.token = uuid(); | ||
this.LOCK_RENEW_TIME = LOCK_RENEW_TIME; | ||
@@ -161,10 +174,13 @@ this.STALLED_JOB_CHECK_INTERVAL = STALLED_JOB_CHECK_INTERVAL; | ||
// emit ready when redis connections ready | ||
this._initializing = Promise.join( | ||
this.client.selectAsync(redisDB), | ||
this.bclient.selectAsync(redisDB), | ||
this.eclient.selectAsync(redisDB) | ||
).then(function(){ | ||
var initializers = [this.client, this.bclient, this.eclient].map(function (client) { | ||
return new Promise(function(resolve) { | ||
client.once('ready', resolve); | ||
}); | ||
}); | ||
this._initializing = Promise.all(initializers) | ||
.then(function(){ | ||
return Promise.join( | ||
_this.eclient.subscribeAsync(_this.toKey('delayed')), | ||
_this.eclient.subscribeAsync(_this.toKey('paused')) | ||
_this.eclient.subscribe(_this.toKey('delayed')), | ||
_this.eclient.subscribe(_this.toKey('paused')) | ||
); | ||
@@ -196,3 +212,3 @@ }).then(function(){ | ||
if(eventName !== 'cleaned'){ | ||
if(eventName !== 'cleaned' && eventName !== 'error'){ | ||
args[0] = Job.fromJSON(_this, args[0]); | ||
@@ -268,3 +284,5 @@ } | ||
var args = Array.prototype.slice.call(arguments); | ||
Disturbed.prototype.on.apply(this, args); | ||
var promise = Disturbed.prototype.on.apply(this, args); | ||
var _this = this; | ||
promise.catch(function(err){ _this.emit('error', err); }); | ||
return this; | ||
@@ -297,4 +315,4 @@ }; | ||
return Promise.join( | ||
_this.client.quitAsync(), | ||
_this.eclient.quitAsync() | ||
_this.client.quit(), | ||
_this.eclient.quit() | ||
).then(endClients, endClients); | ||
@@ -400,4 +418,4 @@ }; | ||
return multi.execAsync().then(function(res){ | ||
return Math.max(res[0], res[1]) + res[2]; | ||
return multi.exec().then(function(res){ | ||
return Math.max(res[0][1], res[1][1]) + res[2][1]; | ||
}); | ||
@@ -430,3 +448,5 @@ }; | ||
return multi.execAsync().spread(function(waiting, paused){ | ||
return multi.exec().spread(function(waiting, paused){ | ||
waiting = waiting[1]; | ||
paused = paused[1]; | ||
var jobKeys = (paused.concat(waiting)).map(_this.toKey, _this); | ||
@@ -438,3 +458,3 @@ | ||
multi.del.apply(multi, jobKeys); | ||
return multi.execAsync(); | ||
return multi.exec(); | ||
} | ||
@@ -511,3 +531,3 @@ }); | ||
return queue.client.evalAsync(script, keys.length, keys[0], keys[1], keys[2], keys[3], pause ? 'paused' : 'resumed'); | ||
return queue.client.eval(script, keys.length, keys[0], keys[1], keys[2], keys[3], pause ? 'paused' : 'resumed'); | ||
} | ||
@@ -525,7 +545,3 @@ | ||
if (_this.STALLED_JOB_CHECK_INTERVAL > 0){ | ||
clearInterval(_this.moveUnlockedJobsToWaitInterval); | ||
_this.moveUnlockedJobsToWaitInterval = | ||
setInterval(_this.moveUnlockedJobsToWait, _this.STALLED_JOB_CHECK_INTERVAL); | ||
} | ||
_this.startMoveUnlockedJobsToWait(); | ||
@@ -578,22 +594,26 @@ return Promise.all(promises); | ||
if(this.closing){ | ||
return this.closing; | ||
} else{ | ||
return scripts.moveUnlockedJobsToWait(this).then(function(responses){ | ||
var handleFailedJobs = responses[0].map(function(jobId){ | ||
return _this.getJobFromId(jobId).then(function(job){ | ||
_this.distEmit('failed', job.toJSON(), new Error('job stalled more than allowable limit')); | ||
return null; | ||
}); | ||
return scripts.moveUnlockedJobsToWait(this).then(function(responses){ | ||
var handleFailedJobs = responses[0].map(function(jobId){ | ||
return _this.getJobFromId(jobId).then(function(job){ | ||
_this.distEmit('failed', job.toJSON(), new Error('job stalled more than allowable limit')); | ||
return null; | ||
}); | ||
var handleStalledJobs = responses[1].map(function(jobId){ | ||
return _this.getJobFromId(jobId).then(function(job){ | ||
_this.distEmit('stalled', job.toJSON()); | ||
return null; | ||
}); | ||
}); | ||
var handleStalledJobs = responses[1].map(function(jobId){ | ||
return _this.getJobFromId(jobId).then(function(job){ | ||
_this.distEmit('stalled', job.toJSON()); | ||
return null; | ||
}); | ||
return Promise.all(handleFailedJobs.concat(handleStalledJobs)); | ||
}).catch(function(err){ | ||
console.error('Failed to handle unlocked job in active:', err); | ||
}); | ||
return Promise.all(handleFailedJobs.concat(handleStalledJobs)); | ||
}).catch(function(err){ | ||
console.error('Failed to handle unlocked job in active:', err); | ||
}); | ||
}; | ||
Queue.prototype.startMoveUnlockedJobsToWait = function() { | ||
if (this.STALLED_JOB_CHECK_INTERVAL > 0){ | ||
clearInterval(this.moveUnlockedJobsToWaitInterval); | ||
this.moveUnlockedJobsToWaitInterval = | ||
setInterval(this.moveUnlockedJobsToWait, this.STALLED_JOB_CHECK_INTERVAL); | ||
} | ||
@@ -640,7 +660,4 @@ }; | ||
var lockRenewer = function(){ | ||
// The first call to lock the job should ensure that the job is in the 'active' state, | ||
// because it might have gotten picked up already by another processor. We don't need | ||
// to do this on subsequent calls. | ||
return job.takeLock(_this.token, renew, !renew).then(function(locked){ | ||
if(locked){ | ||
return job.takeLock(renew, true).then(function(lock){ | ||
if(lock){ | ||
renew = true; | ||
@@ -651,3 +668,3 @@ lockRenewId = _this.timers.set('lockRenewer', _this.LOCK_RENEW_TIME / 2, lockRenewer); | ||
// handler know and cancel the timer? | ||
return locked; | ||
return lock; | ||
}, function(err){ | ||
@@ -674,6 +691,5 @@ console.error('Error renewing lock ' + err); | ||
return job.moveToCompleted(data, _this.token) | ||
return job.moveToCompleted(data) | ||
.then(function(){ | ||
_this.distEmit('completed', job.toJSON(), data); | ||
return null; // Fixes #253 | ||
return _this.distEmit('completed', job.toJSON(), data); | ||
}); | ||
@@ -686,6 +702,5 @@ } | ||
return job.moveToFailed(err) | ||
.then(job.releaseLock.bind(job, _this.token)) | ||
.then(job.releaseLock.bind(job)) | ||
.then(function(){ | ||
_this.distEmit('failed', job.toJSON(), error); | ||
return null; // Fixes #253 | ||
return _this.distEmit('failed', job.toJSON(), error); | ||
}); | ||
@@ -719,9 +734,11 @@ } | ||
Queue.prototype.getNextJob = function(opts){ | ||
return this.moveJob('wait', 'active', opts).then(this.getJobFromId); | ||
if(!this.closing){ | ||
return this.moveJob('wait', 'active', opts).then(this.getJobFromId); | ||
}else{ | ||
return Promise.reject(); | ||
} | ||
}; | ||
Queue.prototype.multi = function(){ | ||
var multi = this.client.multi(); | ||
multi.execAsync = Promise.promisify(multi.exec); | ||
return multi; | ||
return this.client.multi(); | ||
}; | ||
@@ -738,3 +755,3 @@ | ||
if(!this.closing){ | ||
return this.bclient.rpoplpushAsync(this.toKey(src), this.toKey(dst)); | ||
return this.bclient.rpoplpush(this.toKey(src), this.toKey(dst)); | ||
}else{ | ||
@@ -744,3 +761,3 @@ return Promise.reject(); | ||
}else{ | ||
return this.bclient.brpoplpushAsync( | ||
return this.bclient.brpoplpush( | ||
this.toKey(src), | ||
@@ -795,4 +812,4 @@ this.toKey(dst), | ||
return multi.execAsync().then(function(res){ | ||
return _.reduce(res, function(total, n) { | ||
return multi.exec().then(function(res){ | ||
return _.reduce(res[0], function(total, n) { | ||
return total + n; | ||
@@ -804,23 +821,23 @@ }) || 0; | ||
Queue.prototype.getCompletedCount = function() { | ||
return this.client.scardAsync(this.toKey('completed')); | ||
return this.client.scard(this.toKey('completed')); | ||
}; | ||
Queue.prototype.getFailedCount = function() { | ||
return this.client.scardAsync(this.toKey('failed')); | ||
return this.client.scard(this.toKey('failed')); | ||
}; | ||
Queue.prototype.getDelayedCount = function() { | ||
return this.client.zcardAsync(this.toKey('delayed')); | ||
return this.client.zcard(this.toKey('delayed')); | ||
}; | ||
Queue.prototype.getActiveCount = function() { | ||
return this.client.llenAsync(this.toKey('active')); | ||
return this.client.llen(this.toKey('active')); | ||
}; | ||
Queue.prototype.getWaitingCount = function() { | ||
return this.client.llenAsync(this.toKey('wait')); | ||
return this.client.llen(this.toKey('wait')); | ||
}; | ||
Queue.prototype.getPausedCount = function() { | ||
return this.client.llenAsync(this.toKey('paused')); | ||
return this.client.llen(this.toKey('paused')); | ||
}; | ||
@@ -862,6 +879,6 @@ | ||
case 'LIST': | ||
jobs = this.client.lrangeAsync(key, start, end); | ||
jobs = this.client.lrange(key, start, end); | ||
break; | ||
case 'SET': | ||
jobs = this.client.smembersAsync(key).then(function(jobIds) { | ||
jobs = this.client.smembers(key).then(function(jobIds) { | ||
// Can't set a range for smembers. So do the slice programatically instead. | ||
@@ -877,3 +894,3 @@ // Note that redis ranges are inclusive, so handling for javascript accordingly | ||
case 'ZSET': | ||
jobs = this.client.zrangeAsync(key, start, end); | ||
jobs = this.client.zrange(key, start, end); | ||
break; | ||
@@ -952,9 +969,14 @@ } | ||
resolver = _.after(count, function(){ | ||
_this.removeListener('stalled', resolver); | ||
_this.removeListener('completed', resolver); | ||
_this.removeListener('failed', resolver); | ||
clearInterval(_this.moveUnlockedJobsToWaitInterval); | ||
resolve(); | ||
}); | ||
_this.on('stalled', resolver); | ||
_this.on('completed', resolver); | ||
_this.on('failed', resolver); | ||
_this.startMoveUnlockedJobsToWait(); | ||
} | ||
@@ -969,3 +991,3 @@ }, reject); | ||
var getRedisVersion = function getRedisVersion(client){ | ||
return client.infoAsync().then(function(doc){ | ||
return client.info().then(function(doc){ | ||
var prefix = 'redis_version:'; | ||
@@ -972,0 +994,0 @@ var lines = doc.split('\r\n'); |
@@ -13,2 +13,3 @@ /** | ||
var debuglog = require('debuglog')('bull'); | ||
var Redlock = require('redlock'); | ||
@@ -29,3 +30,3 @@ var cache = {}; | ||
args.unshift(sha); | ||
return client.evalshaAsync.apply(client, args).catch(function(err){ | ||
return client.evalsha.apply(client, args).catch(function(err){ | ||
debuglog(err, script, err.stack); | ||
@@ -47,3 +48,3 @@ if(err.code === 'NOSCRIPT'){ | ||
function cacheScript(client, hash, script){ | ||
cache[hash] = client.scriptAsync('LOAD', script); | ||
cache[hash] = client.script('LOAD', script); | ||
return cache[hash]; | ||
@@ -53,4 +54,7 @@ } | ||
var scripts = { | ||
isJobInList: function(client, listKey, jobId){ | ||
var script = [ | ||
_isJobInList: function(keyVar, argVar, operator) { | ||
keyVar = keyVar || 'KEYS[1]'; | ||
argVar = argVar || 'ARGV[1]'; | ||
operator = operator || 'return'; | ||
return [ | ||
'local function item_in_list (list, item)', | ||
@@ -64,7 +68,8 @@ ' for _, v in pairs(list) do', | ||
'end', | ||
'local items = redis.call("LRANGE", KEYS[1], 0, -1)', | ||
'return item_in_list(items, ARGV[1])' | ||
['local items = redis.call("LRANGE",', keyVar, ' , 0, -1)'].join(''), | ||
[operator, ' item_in_list(items, ', argVar, ')'].join('') | ||
].join('\n'); | ||
return execScript(client, 'isJobInList', script, 1, listKey, jobId).then(function(result){ | ||
}, | ||
isJobInList: function(client, listKey, jobId){ | ||
return execScript(client, 'isJobInList', this._isJobInList(), 1, listKey, jobId).then(function(result){ | ||
return result === 1; | ||
@@ -144,3 +149,3 @@ }); | ||
// of all the specialized functions moveToComplete, etc. | ||
move: function(job, token, src, target){ | ||
move: function(job, src, target){ | ||
// TODO: Depending on the source we should use LREM, SREM or ZREM. | ||
@@ -167,11 +172,5 @@ // TODO: Depending on the target we should use LPUSH, SADD, etc. | ||
'if redis.call("EXISTS", KEYS[3]) == 1 then', // Make sure job exists | ||
' local lock = redis.call("GET", KEYS[4])', | ||
' if (not lock) or (lock == ARGV[3]) then', // Makes sure we own the lock | ||
' redis.call("LREM", KEYS[1], -1, ARGV[1])', | ||
' redis.call("LREM", KEYS[1], -1, ARGV[1])', | ||
target ? moveJob : deleteJob, | ||
' redis.call("DEL", KEYS[4])', | ||
' return 0', | ||
' else', | ||
' return -2', | ||
' end', | ||
' return 0', | ||
'else', | ||
@@ -192,7 +191,17 @@ ' return -1', | ||
job.jobId, | ||
job.returnvalue ? JSON.stringify(job.returnvalue) : '', | ||
token | ||
job.returnvalue ? JSON.stringify(job.returnvalue) : '' | ||
]; | ||
return execScript.apply(scripts, args).then(function(result){ | ||
var returnLockOrErrorCode = function(lock) { | ||
return lock ? execScript.apply(scripts, args) : -2; | ||
}; | ||
var throwUnexpectedErrors = function(err) { | ||
if (!(err instanceof Redlock.LockError)) { | ||
throw err; | ||
} | ||
}; | ||
return job.takeLock(!!job.lock) | ||
.then(returnLockOrErrorCode, throwUnexpectedErrors) | ||
.then(function(result){ | ||
switch (result){ | ||
@@ -207,8 +216,9 @@ case -1: | ||
throw new Error('Cannot get lock for job ' + job.jobId + ' when trying to move from ' + src); | ||
default: | ||
return job.releaseLock() | ||
} | ||
}); | ||
}, | ||
moveToCompleted: function(job, token, removeOnComplete){ | ||
return scripts.move(job, token, 'active', removeOnComplete ? void 0 : 'completed'); | ||
moveToCompleted: function(job, removeOnComplete){ | ||
return scripts.move(job, 'active', removeOnComplete ? void 0 : 'completed'); | ||
}, | ||
@@ -306,89 +316,49 @@ | ||
* @param {Job} job The job | ||
* @param {Boolean=false} renew Whether to renew to lock, meaning it will assume the job | ||
* @param {Boolean=false} renew Whether to renew to lock, meaning it will assume the job | ||
* is already locked and just reset the lock expiration. | ||
* @param {Boolean=false} ensureActive Ensures that the job is in the 'active' state. | ||
*/ | ||
takeLock: function(queue, job, token, renew, ensureActive){ | ||
// Ensures that the lock doesn't exist, or if it does, that we own it. | ||
var ensureOwnershipCall = [ | ||
'local prevLock = redis.call("GET", KEYS[1])', | ||
'if (prevLock and prevLock ~= ARGV[1]) then', | ||
' return 0', | ||
'end' | ||
].join('\n'); | ||
takeLock: function(queue, job, renew, ensureActive){ | ||
var lock = job.lock; | ||
if (renew && !lock) { | ||
throw new Error('Unable to renew nonexisting lock'); | ||
} | ||
if (renew) { | ||
return lock.extend(queue.LOCK_RENEW_TIME); | ||
} | ||
if (lock) { | ||
return Promise.resolve(null); | ||
} | ||
// Ensures that the lock in the 'active' state. | ||
var ensureActiveCall = [ | ||
// Note that while this is inefficient to run a O(n) traversal of the 'active' queue, | ||
// it's highly likely that the job is within the first few elements of the active | ||
// list at the time this call is used. | ||
'local activeJobs = redis.call("LRANGE", KEYS[3], 0, -1)', | ||
'local found = false', | ||
'for _, job in ipairs(activeJobs) do', | ||
' if(job == ARGV[3]) then', | ||
' found = true', | ||
' break', | ||
' end', | ||
'end', | ||
'if (found == false) then', | ||
' return 0', | ||
'end' | ||
].join('\n'); | ||
var lockCall; | ||
if (renew){ | ||
lockCall = 'redis.call("SET", KEYS[1], ARGV[1], "PX", ARGV[2])'; | ||
var redlock; | ||
if (ensureActive) { | ||
var keyVar = ['"', job.queue.toKey('active'), '"'].join(''); | ||
var argVar = ['"', job.jobId, '"'].join(''); | ||
var isJobInList = this._isJobInList(keyVar, argVar, 'if'); | ||
var lockAcquired = ['and redis.call("HSET", "', queue.toKey(job.jobId), '", "lockAcquired", "1")'].join(''); | ||
var success = 'then return 1 else return 0 end'; | ||
var opts = { | ||
lockScript: function(lockScript) { | ||
return [ | ||
isJobInList, | ||
lockScript.replace('return', 'and'), | ||
lockAcquired, | ||
success | ||
].join('\n'); | ||
} | ||
}; | ||
redlock = new Redlock(queue.clients, _.extend(opts, queue.redlock)); | ||
} else { | ||
lockCall = 'redis.call("SET", KEYS[1], ARGV[1], "PX", ARGV[2], "NX")'; | ||
redlock = new Redlock(queue.clients, queue.redlock); | ||
} | ||
var script = [ | ||
(renew ? ensureOwnershipCall : ''), | ||
(ensureActive ? ensureActiveCall : ''), | ||
'if(' + lockCall + ') then', | ||
// Mark the job as having been locked at least once. Used to determine if the job was stalled. | ||
' redis.call("HSET", KEYS[2], "lockAcquired", "1")', | ||
' return 1', | ||
'else', | ||
' return 0', | ||
'end' | ||
].join('\n'); | ||
var args = [ | ||
queue.client, | ||
'takeLock' + (renew ? 'Renew' : '') + (ensureActive ? 'EnsureActive' : ''), | ||
script, | ||
3, | ||
job.lockKey(), | ||
queue.toKey(job.jobId), | ||
queue.toKey('active'), | ||
token, | ||
queue.LOCK_RENEW_TIME, | ||
job.jobId | ||
]; | ||
return execScript.apply(scripts, args); | ||
return redlock.lock(job.lockKey(), queue.LOCK_RENEW_TIME); | ||
}, | ||
releaseLock: function(job, token){ | ||
var script = [ | ||
'if redis.call("get", KEYS[1]) == ARGV[1]', | ||
'then', | ||
' return redis.call("del", KEYS[1])', | ||
'else', | ||
' return 0', | ||
'end'].join('\n'); | ||
var args = [ | ||
job.queue.client, | ||
'releaseLock', | ||
script, | ||
1, | ||
job.lockKey(), | ||
token | ||
]; | ||
return execScript.apply(scripts, args).then(function(result){ | ||
return result === 1; | ||
}); | ||
releaseLock: function(job){ | ||
var lock = job.lock; | ||
if (!lock) { | ||
throw new Error('Unable to release nonexisting lock'); | ||
} | ||
return lock.unlock() | ||
}, | ||
@@ -661,3 +631,3 @@ /** | ||
return multi.execAsync().spread(function(waiting, paused){ | ||
return multi.exec().spread(function(waiting, paused){ | ||
var jobKeys = (paused.concat(waiting)).map(_this.toKey, _this); | ||
@@ -669,3 +639,3 @@ | ||
multi.del.apply(multi, jobKeys); | ||
return multi.execAsync(); | ||
return multi.exec(); | ||
} | ||
@@ -672,0 +642,0 @@ }); |
@@ -6,3 +6,3 @@ /*eslint-env node */ | ||
var _ = require('lodash'); | ||
var uuid = require('node-uuid'); | ||
var uuid = require('uuid'); | ||
@@ -9,0 +9,0 @@ /** |
{ | ||
"name": "bull", | ||
"version": "1.1.3", | ||
"version": "2.0.0", | ||
"description": "Job manager", | ||
@@ -23,6 +23,7 @@ "main": "index.js", | ||
"disturbed": "^1.0.6", | ||
"lodash": "^4.16.6", | ||
"node-uuid": "^1.4.7", | ||
"redis": "^2.6.3", | ||
"semver": "^5.3.0" | ||
"ioredis": "^2.4.0", | ||
"lodash": "^4.17.2", | ||
"redlock": "^2.1.0", | ||
"semver": "^5.3.0", | ||
"uuid": "^3.0.0" | ||
}, | ||
@@ -29,0 +30,0 @@ "devDependencies": { |
@@ -9,3 +9,3 @@ Bull Job Manager | ||
<img src="https://image.freepik.com/free-icon/strong-bull-side-view_318-52710.jpg", width="200" /> | ||
<img src="https://image.freepik.com/free-icon/strong-bull-side-view_318-52710.jpg" width="200" /> | ||
@@ -12,0 +12,0 @@ The fastest, more reliable redis based queue for nodejs. |
@@ -9,4 +9,3 @@ /*eslint-env node */ | ||
var expect = require('expect.js'); | ||
var Promise = require('bluebird'); | ||
var redis = require('redis'); | ||
var redis = require('ioredis'); | ||
@@ -24,4 +23,3 @@ var STD_QUEUE_NAME = 'cluster test queue'; | ||
var client = redis.createClient(6379, '127.0.0.1', {}); | ||
client = Promise.promisifyAll(client); | ||
client.selectAsync(0); | ||
client.select(0); | ||
@@ -33,3 +31,3 @@ var script = [ | ||
return queue.client.evalAsync( | ||
return queue.client.eval( | ||
script, | ||
@@ -51,14 +49,17 @@ 0, | ||
var workers = []; | ||
var worker; | ||
var _i = 0; | ||
for(_i; _i < os.cpus().length - 1; _i++) { | ||
worker = cluster.fork(); | ||
worker.on('message', workerMessageHandlerWrapper); | ||
workers.push(worker); | ||
console.log('Worker spawned: #', worker.id); | ||
} | ||
describe('Cluster', function () { | ||
var workers = []; | ||
before(function() { | ||
var worker; | ||
var _i = 0; | ||
for(_i; _i < os.cpus().length - 1; _i++) { | ||
worker = cluster.fork(); | ||
worker.on('message', workerMessageHandlerWrapper); | ||
workers.push(worker); | ||
console.log('Worker spawned: #', worker.id); | ||
} | ||
}); | ||
var queue; | ||
@@ -82,2 +83,6 @@ | ||
queue.on('stalled', function(job){ | ||
jobs.splice(jobs.indexOf(job.jobId), 1); | ||
}); | ||
workerMessageHandler = function(job) { | ||
@@ -164,3 +169,3 @@ jobs.push(job.id); | ||
after(function() { | ||
_i = 0; | ||
var _i = 0; | ||
for(_i; _i < workers.length; _i++) { | ||
@@ -167,0 +172,0 @@ workers[_i].kill(); |
@@ -7,6 +7,4 @@ /*eslint-env node */ | ||
var sinon = require('sinon'); | ||
var redis = require('redis'); | ||
var Promise = require('bluebird'); | ||
var redis = require('ioredis'); | ||
Promise.promisifyAll(redis.RedisClient.prototype); | ||
@@ -19,3 +17,3 @@ describe('connection', function () { | ||
var client = redis.createClient(); | ||
return client.flushdbAsync().then(function(){ | ||
return client.flushdb().then(function(){ | ||
queue = utils.buildQueue(); | ||
@@ -31,4 +29,4 @@ }); | ||
jobDone(); | ||
// We do not wait since this close is expected to fail... | ||
queue.close(); | ||
}).then(function() { | ||
done(); | ||
@@ -35,0 +33,0 @@ }).catch(function(err){ |
@@ -7,8 +7,7 @@ /*eslint-env node */ | ||
var expect = require('expect.js'); | ||
var redis = require('redis'); | ||
var redis = require('ioredis'); | ||
var Promise = require('bluebird'); | ||
var uuid = require('node-uuid'); | ||
var uuid = require('uuid'); | ||
var Redlock = require('redlock'); | ||
Promise.promisifyAll(redis.RedisClient.prototype); | ||
Promise.promisifyAll(redis.Multi.prototype); | ||
@@ -20,3 +19,3 @@ describe('Job', function(){ | ||
var client = redis.createClient(); | ||
return client.flushdbAsync(); | ||
return client.flushdb(); | ||
}); | ||
@@ -29,2 +28,3 @@ | ||
afterEach(function(){ | ||
this.timeout(queue.STALLED_JOB_CHECK_INTERVAL * (1 + queue.MAX_STALLED_JOB_COUNT)); | ||
return queue.close(); | ||
@@ -52,2 +52,6 @@ }); | ||
it('should not modify input options', function() { | ||
expect(opts).not.to.have.property('jobId'); | ||
}); | ||
it('saves the job in redis', function () { | ||
@@ -102,8 +106,7 @@ return Job.fromId(queue, job.jobId).then(function(storedJob){ | ||
it('fails to remove a locked job', function() { | ||
var token = uuid(); | ||
return Job.create(queue, 1, {foo: 'bar'}).then(function(job) { | ||
return job.takeLock(token).then(function(lock) { | ||
expect(lock).to.be(true); | ||
return job.takeLock().then(function(lock) { | ||
expect(lock).to.be.a(Redlock.Lock); | ||
}).then(function() { | ||
return job.remove(token); | ||
return job.remove(); | ||
}).then(function() { | ||
@@ -201,7 +204,7 @@ throw new Error('Should not be able to remove a locked job'); | ||
it('can take a lock', function(){ | ||
return job.takeLock('423').then(function(lockTaken){ | ||
expect(lockTaken).to.be(true); | ||
return job.takeLock().then(function(lockTaken){ | ||
expect(lockTaken).to.be.a(Redlock.Lock); | ||
}).then(function(){ | ||
return job.releaseLock('321').then(function(lockReleased){ | ||
expect(lockReleased).to.be(false); | ||
return job.releaseLock().then(function(lockReleased){ | ||
expect(lockReleased).to.not.exist; | ||
}); | ||
@@ -212,6 +215,6 @@ }); | ||
it('cannot take an already taken lock', function(){ | ||
return job.takeLock('1234').then(function(lockTaken){ | ||
expect(lockTaken).to.be(true); | ||
return job.takeLock().then(function(lockTaken){ | ||
expect(lockTaken).to.be.a(Redlock.Lock); | ||
}).then(function(){ | ||
return job.takeLock('1234').then(function(lockTaken){ | ||
return job.takeLock().then(function(lockTaken){ | ||
expect(lockTaken).to.be(false); | ||
@@ -223,7 +226,7 @@ }); | ||
it('can renew a previously taken lock', function(){ | ||
return job.takeLock('1235').then(function(lockTaken){ | ||
expect(lockTaken).to.be(true); | ||
return job.takeLock().then(function(lockTaken){ | ||
expect(lockTaken).to.be.a(Redlock.Lock); | ||
}).then(function(){ | ||
return job.renewLock('1235').then(function(lockRenewed){ | ||
expect(lockRenewed).to.be(true); | ||
return job.renewLock().then(function(lockRenewed){ | ||
expect(lockRenewed).to.be.a(Redlock.Lock); | ||
}); | ||
@@ -234,12 +237,8 @@ }); | ||
it('can release a lock', function(){ | ||
return job.takeLock('1237').then(function(lockTaken){ | ||
expect(lockTaken).to.be(true); | ||
return job.takeLock().then(function(lockTaken){ | ||
expect(lockTaken).to.be.a(Redlock.Lock); | ||
}).then(function(){ | ||
return job.releaseLock('321').then(function(lockReleased){ | ||
expect(lockReleased).to.be(false); | ||
return job.releaseLock().then(function(lockReleased){ | ||
expect(lockReleased).to.not.exist; | ||
}); | ||
}).then(function(){ | ||
return job.releaseLock('1237').then(function(lockReleased){ | ||
expect(lockReleased).to.be(true); | ||
}); | ||
}); | ||
@@ -393,3 +392,3 @@ }); | ||
var client = Promise.promisifyAll(redis.createClient()); | ||
var client = redis.createClient(); | ||
return Job.create(queue, {foo: 'baz'}).then(function(job) { | ||
@@ -409,3 +408,3 @@ return job.isStuck().then(function(isStuck) { | ||
expect(state).to.be('completed'); | ||
return client.sremAsync(queue.toKey('completed'), job.jobId); | ||
return client.srem(queue.toKey('completed'), job.jobId); | ||
}).then(function(){ | ||
@@ -420,3 +419,3 @@ return job.moveToDelayed(Date.now() + 10000); | ||
expect(state).to.be('delayed'); | ||
return client.zremAsync(queue.toKey('delayed'), job.jobId); | ||
return client.zrem(queue.toKey('delayed'), job.jobId); | ||
}).then(function() { | ||
@@ -431,3 +430,3 @@ return job.moveToFailed(new Error('test')); | ||
expect(state).to.be('failed'); | ||
return client.sremAsync(queue.toKey('failed'), job.jobId); | ||
return client.srem(queue.toKey('failed'), job.jobId); | ||
}).then(function(res) { | ||
@@ -438,5 +437,5 @@ expect(res).to.be(1); | ||
expect(state).to.be('stuck'); | ||
return client.rpopAsync(queue.toKey('wait')); | ||
return client.rpop(queue.toKey('wait')); | ||
}).then(function(){ | ||
return client.lpushAsync(queue.toKey('paused'), job.jobId); | ||
return client.lpush(queue.toKey('paused'), job.jobId); | ||
}).then(function() { | ||
@@ -449,5 +448,5 @@ return job.isPaused(); | ||
expect(state).to.be('paused'); | ||
return client.rpopAsync(queue.toKey('paused')); | ||
return client.rpop(queue.toKey('paused')); | ||
}).then(function() { | ||
return client.lpushAsync(queue.toKey('wait'), job.jobId); | ||
return client.lpush(queue.toKey('wait'), job.jobId); | ||
}).then(function() { | ||
@@ -454,0 +453,0 @@ return job.isWaiting(); |
@@ -10,4 +10,4 @@ /// <reference path='../typings/mocha/mocha.d.ts'/> | ||
var _ = require('lodash'); | ||
var uuid = require('node-uuid'); | ||
var redis = require('redis'); | ||
var uuid = require('uuid'); | ||
var redis = require('ioredis'); | ||
@@ -31,3 +31,3 @@ var STD_QUEUE_NAME = 'test queue'; | ||
var client = redis.createClient(); | ||
return client.flushdbAsync(); | ||
return client.flushdb(); | ||
}); | ||
@@ -34,0 +34,0 @@ |
@@ -7,10 +7,8 @@ /*eslint-env node */ | ||
var Promise = require('bluebird'); | ||
var redis = require('redis'); | ||
var redis = require('ioredis'); | ||
var sinon = require('sinon'); | ||
var _ = require('lodash'); | ||
var uuid = require('node-uuid'); | ||
var uuid = require('uuid'); | ||
var utils = require('./utils'); | ||
Promise.promisifyAll(redis.RedisClient.prototype); | ||
Promise.promisifyAll(redis.Multi.prototype); | ||
@@ -36,3 +34,3 @@ /* | ||
var client = redis.createClient(); | ||
return client.flushdbAsync(); | ||
return client.flushdb(); | ||
}); | ||
@@ -74,10 +72,10 @@ | ||
it('should resolve the promise when each client has disconnected', function () { | ||
expect(testQueue.client.connected).to.be(true); | ||
expect(testQueue.bclient.connected).to.be(true); | ||
expect(testQueue.eclient.connected).to.be(true); | ||
expect(testQueue.client.status).to.be('ready'); | ||
expect(testQueue.bclient.status).to.be('ready'); | ||
expect(testQueue.eclient.status).to.be('ready'); | ||
return testQueue.close().then(function () { | ||
expect(testQueue.client.connected).to.be(false); | ||
expect(testQueue.bclient.connected).to.be(false); | ||
expect(testQueue.eclient.connected).to.be(false); | ||
expect(testQueue.client.status).to.be('end'); | ||
expect(testQueue.bclient.status).to.be('end'); | ||
expect(testQueue.eclient.status).to.be('end'); | ||
}); | ||
@@ -94,2 +92,3 @@ }); | ||
it('should close if the job expires after the LOCK_RENEW_TIME', function (done) { | ||
this.timeout(testQueue.STALLED_JOB_CHECK_INTERVAL * 2); | ||
testQueue.LOCK_RENEW_TIME = 10; | ||
@@ -145,10 +144,10 @@ testQueue.process(function () { | ||
queue.once('ready', function () { | ||
expect(queue.client.connection_options.host).to.be('127.0.0.1'); | ||
expect(queue.bclient.connection_options.host).to.be('127.0.0.1'); | ||
expect(queue.client.options.host).to.be('127.0.0.1'); | ||
expect(queue.bclient.options.host).to.be('127.0.0.1'); | ||
expect(queue.client.connection_options.port).to.be(6379); | ||
expect(queue.bclient.connection_options.port).to.be(6379); | ||
expect(queue.client.options.port).to.be(6379); | ||
expect(queue.bclient.options.port).to.be(6379); | ||
expect(queue.client.selected_db).to.be(0); | ||
expect(queue.bclient.selected_db).to.be(0); | ||
expect(queue.client.options.db).to.be(0); | ||
expect(queue.bclient.options.db).to.be(0); | ||
@@ -163,10 +162,10 @@ queue.close().then(done); | ||
queue.once('ready', function () { | ||
expect(queue.client.connection_options.host).to.be('127.0.0.1'); | ||
expect(queue.bclient.connection_options.host).to.be('127.0.0.1'); | ||
expect(queue.client.options.host).to.be('127.0.0.1'); | ||
expect(queue.bclient.options.host).to.be('127.0.0.1'); | ||
expect(queue.client.connection_options.port).to.be(6379); | ||
expect(queue.bclient.connection_options.port).to.be(6379); | ||
expect(queue.client.options.port).to.be(6379); | ||
expect(queue.bclient.options.port).to.be(6379); | ||
expect(queue.client.selected_db).to.be(0); | ||
expect(queue.bclient.selected_db).to.be(0); | ||
expect(queue.client.options.db).to.be(0); | ||
expect(queue.bclient.options.db).to.be(0); | ||
@@ -182,10 +181,10 @@ queue.close().then(done); | ||
queue.once('ready', function () { | ||
expect(queue.client.connection_options.host).to.be('127.0.0.1'); | ||
expect(queue.bclient.connection_options.host).to.be('127.0.0.1'); | ||
expect(queue.client.options.host).to.be('127.0.0.1'); | ||
expect(queue.bclient.options.host).to.be('127.0.0.1'); | ||
expect(queue.client.connection_options.port).to.be(6379); | ||
expect(queue.bclient.connection_options.port).to.be(6379); | ||
expect(queue.client.options.port).to.be(6379); | ||
expect(queue.bclient.options.port).to.be(6379); | ||
expect(queue.client.selected_db).to.be(0); | ||
expect(queue.bclient.selected_db).to.be(0); | ||
expect(queue.client.condition.select).to.be(0); | ||
expect(queue.bclient.condition.select).to.be(0); | ||
@@ -201,10 +200,10 @@ queue.close().then(done); | ||
queue.once('ready', function () { | ||
expect(queue.client.connection_options.host).to.be('127.0.0.1'); | ||
expect(queue.bclient.connection_options.host).to.be('127.0.0.1'); | ||
expect(queue.client.options.host).to.be('127.0.0.1'); | ||
expect(queue.bclient.options.host).to.be('127.0.0.1'); | ||
expect(queue.client.connection_options.port).to.be(6379); | ||
expect(queue.bclient.connection_options.port).to.be(6379); | ||
expect(queue.client.options.port).to.be(6379); | ||
expect(queue.bclient.options.port).to.be(6379); | ||
expect(queue.client.selected_db).to.be(1); | ||
expect(queue.bclient.selected_db).to.be(1); | ||
expect(queue.client.options.db).to.be(1); | ||
expect(queue.bclient.options.db).to.be(1); | ||
@@ -219,7 +218,7 @@ queue.close().then(done); | ||
queue.once('ready', function () { | ||
expect(queue.client.connection_options.host).to.be('localhost'); | ||
expect(queue.bclient.connection_options.host).to.be('localhost'); | ||
expect(queue.client.options.host).to.be('localhost'); | ||
expect(queue.bclient.options.host).to.be('localhost'); | ||
expect(queue.client.selected_db).to.be(0); | ||
expect(queue.bclient.selected_db).to.be(0); | ||
expect(queue.client.options.db).to.be(0); | ||
expect(queue.bclient.options.db).to.be(0); | ||
@@ -280,3 +279,3 @@ queue.close().then(done); | ||
var client = redis.createClient(); | ||
return client.flushdbAsync().then(function () { | ||
return client.flushdb().then(function () { | ||
return utils.newQueue(); | ||
@@ -289,2 +288,3 @@ }).then(function (_queue) { | ||
afterEach(function () { | ||
this.timeout(queue.STALLED_JOB_CHECK_INTERVAL * (1 + queue.MAX_STALLED_JOB_COUNT)); | ||
return utils.cleanupQueues(); | ||
@@ -298,3 +298,3 @@ }); | ||
done(); | ||
}); | ||
}).catch(done); | ||
queue.add({ foo: 'bar' }).then(function (job) { | ||
@@ -406,3 +406,3 @@ expect(job.jobId).to.be.ok(); | ||
expect(job.returnvalue).to.be.eql(37); | ||
queue.client.hgetAsync(queue.toKey(job.jobId), 'returnvalue').then(function (retval) { | ||
queue.client.hget(queue.toKey(job.jobId), 'returnvalue').then(function (retval) { | ||
expect(JSON.parse(retval)).to.be.eql(37); | ||
@@ -484,7 +484,11 @@ done(); | ||
return queueStalled.close(true).then(function () { | ||
return new Promise(function (resolve) { | ||
return new Promise(function (resolve, reject) { | ||
utils.newQueue('test queue stalled').then(function (queue2) { | ||
queue2.LOCK_RENEW_TIME = 100; | ||
var doneAfterFour = _.after(4, function () { | ||
expect(stalledCallback.calledOnce).to.be(true); | ||
try { | ||
expect(stalledCallback.calledOnce).to.be(true); | ||
} catch (e) { | ||
reject(e); | ||
} | ||
resolve(); | ||
@@ -610,3 +614,3 @@ }); | ||
setTimeout(jobDone, 500); | ||
}); | ||
}).catch(done); | ||
@@ -826,3 +830,3 @@ utils.newQueue().then(function (_anotherQueue) { | ||
var client = redis.createClient(); | ||
return client.flushdbAsync(); | ||
return client.flushdb(); | ||
}); | ||
@@ -891,3 +895,3 @@ | ||
it('should pause the queue locally', function (testDone) { | ||
it('should pause the queue locally', function (done) { | ||
var counter = 2; | ||
@@ -897,14 +901,14 @@ | ||
queue.pause(true /* Local */).then(function () { | ||
queue.pause(true /* Local */).tap(function () { | ||
// Add the worker after the queue is in paused mode since the normal behavior is to pause | ||
// it after the current lock expires. This way, we can ensure there isn't a lock already | ||
// to test that pausing behavior works. | ||
queue.process(function (job, done) { | ||
queue.process(function (job, jobDone) { | ||
expect(queue.paused).not.to.be.ok(); | ||
done(); | ||
jobDone(); | ||
counter--; | ||
if (counter === 0) { | ||
queue.close().then(testDone); | ||
queue.close().then(done); | ||
} | ||
}); | ||
}).catch(done); | ||
}).then(function () { | ||
@@ -918,3 +922,3 @@ return queue.add({ foo: 'paused' }); | ||
return queue.resume(true /* Local */); | ||
}); | ||
}).catch(done); | ||
}); | ||
@@ -934,3 +938,3 @@ | ||
Promise.all(jobs).then(function () { | ||
queue.pause(true).then(function () { | ||
return queue.pause(true).then(function () { | ||
var active = queue.getJobCountByTypes(['active']).then(function (count) { | ||
@@ -967,3 +971,3 @@ expect(count).to.be(0); | ||
}); | ||
}); | ||
}).catch(done); | ||
}); | ||
@@ -1006,3 +1010,3 @@ }); | ||
var client = redis.createClient(); | ||
return client.flushdbAsync(); | ||
return client.flushdb(); | ||
}); | ||
@@ -1055,3 +1059,4 @@ | ||
queue.on('failed', function (err) { | ||
queue.on('failed', function (job, err) { | ||
err.job = job; | ||
done(err); | ||
@@ -1169,6 +1174,7 @@ }); | ||
queue = utils.buildQueue(); | ||
return client.flushdbAsync(); | ||
return client.flushdb(); | ||
}); | ||
afterEach(function () { | ||
this.timeout(queue.STALLED_JOB_CHECK_INTERVAL * (1 + queue.MAX_STALLED_JOB_COUNT)); | ||
return queue.close(); | ||
@@ -1279,2 +1285,3 @@ }); | ||
afterEach(function () { | ||
this.timeout(queue.STALLED_JOB_CHECK_INTERVAL * (1 + queue.MAX_STALLED_JOB_COUNT)); | ||
return queue.close(); | ||
@@ -1507,3 +1514,3 @@ }); | ||
describe.only('Jobs getters', function () { | ||
describe('Jobs getters', function () { | ||
var queue; | ||
@@ -1517,2 +1524,3 @@ | ||
afterEach(function () { | ||
this.timeout(queue.STALLED_JOB_CHECK_INTERVAL * (1 + queue.MAX_STALLED_JOB_COUNT)); | ||
return queue.close(); | ||
@@ -1648,2 +1656,3 @@ }); | ||
afterEach(function () { | ||
this.timeout(queue.STALLED_JOB_CHECK_INTERVAL * (1 + queue.MAX_STALLED_JOB_COUNT)); | ||
return queue.close(); | ||
@@ -1760,2 +1769,3 @@ }); | ||
afterEach(function () { | ||
this.timeout(queue.STALLED_JOB_CHECK_INTERVAL * (1 + queue.MAX_STALLED_JOB_COUNT)); | ||
return queue.close(); | ||
@@ -1762,0 +1772,0 @@ }); |
@@ -37,3 +37,5 @@ /*eslint-env node */ | ||
return Promise.map(queues, function(queue){ | ||
return queue.close(); | ||
var errHandler = function() {}; | ||
queue.on('error', errHandler); | ||
return queue.close().catch(errHandler); | ||
}).then(function(){ | ||
@@ -40,0 +42,0 @@ queues = []; |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Uses eval
Supply chain riskPackage uses dynamic code execution (e.g., eval()), which is a dangerous practice. This can prevent the code from running in certain environments and increases the risk that the code may contain exploits or malicious behavior.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Uses eval
Supply chain riskPackage uses dynamic code execution (e.g., eval()), which is a dangerous practice. This can prevent the code from running in certain environments and increases the risk that the code may contain exploits or malicious behavior.
Found 1 instance in 1 package
10
215763
8
5714
+ Addedioredis@^2.4.0
+ Addedredlock@^2.1.0
+ Addeduuid@^3.0.0
+ Addedcluster-key-slot@1.1.2(transitive)
+ Addeddebug@2.6.9(transitive)
+ Addedflexbuffer@0.0.6(transitive)
+ Addedioredis@2.5.0(transitive)
+ Addedms@2.0.0(transitive)
+ Addedredis-parser@1.3.0(transitive)
+ Addedredlock@2.1.2(transitive)
+ Addeduuid@3.4.0(transitive)
- Removednode-uuid@^1.4.7
- Removedredis@^2.6.3
- Removedredis@2.8.0(transitive)
- Removedredis-parser@2.6.0(transitive)
Updatedlodash@^4.17.2