Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

bull

Package Overview
Dependencies
Maintainers
1
Versions
198
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

bull - npm Package Compare versions

Comparing version 1.0.0 to 1.1.0

11

CHANGELOG.md

@@ -0,1 +1,12 @@

v1.1.0
======
- Fixed [job corruption issue](https://github.com/OptimalBits/bull/pull/359)
- The job id can be [overridden](https://github.com/OptimalBits/bull/pull/335) to implement job throttling behavior
- Added [`removeOnComplete` job option](https://github.com/OptimalBits/bull/pull/361)
- [More robust job retry](https://github.com/OptimalBits/bull/pull/318)
- Events are [now broadcast to all workers](https://github.com/OptimalBits/bull/commit/d55ad1c8f44f86be9b4e9f4fa9a3fc8a16c6e02d)
[Changes](https://github.com/OptimalBits/bull/compare/v1.0.0...v1.1.0)
v1.0.0

@@ -2,0 +13,0 @@ ======

205

lib/job.js

@@ -18,3 +18,3 @@ /*eslint-env node */

// queue: Queue, jobId: string, data: {}, opts: JobOptions
// queue: Queue, data: {}, opts: JobOptions
var Job = function(queue, data, opts){

@@ -44,3 +44,3 @@ opts = opts || {};

var toKey = _.bind(queue.toKey, queue);
return scripts.addJob(queue.client, toKey, job.opts.lifo, jobData);
return scripts.addJob(queue.client, toKey, jobData, { lifo: job.opts.lifo, customJobId: job.opts.jobId });
}

@@ -53,8 +53,4 @@

job.jobId = jobId;
queue.emit('waiting', job);
queue.distEmit('waiting', job.toJSON());
debuglog('Job added', jobId);
//
// TODO: emit event based on pubsub
//_this.emit('delayed', job);
//
return job;

@@ -71,3 +67,3 @@ });

if(jobData){
return Job.fromData(queue, +jobId, jobData);
return Job.fromData(queue, jobId, jobData);
}else{

@@ -97,3 +93,3 @@ return jobData;

return this.queue.client.hsetAsync(this.queue.toKey(this.jobId), 'progress', progress).then(function(){
_this.queue.emit('progress', _this, progress);
_this.queue.distEmit('progress', _this.toJSON(), progress);
});

@@ -105,4 +101,23 @@ }else{

//
// toData and fromData should be deprecated.
//
Job.prototype.toJSON = function(){
var opts = this.opts || {};
opts.jobId = this.jobId;
return {
data: this.data || {},
opts: opts,
progress: this._progress,
delay: this.delay,
timestamp: this.timestamp,
attempts: this.attempts,
attemptsMade: this.attemptsMade,
stacktrace: this.stacktrace || null,
returnvalue: this.returnvalue || null
};
};
/**
Return a unique key representin a lock for this Job
Return a unique key representing a lock for this Job
*/

@@ -118,9 +133,4 @@ Job.prototype.lockKey = function(){

Job.prototype.takeLock = function(token, renew){
var args = [this.lockKey(), token, 'PX', this.queue.LOCK_RENEW_TIME];
if(!renew){
args.push('NX');
}
return this.queue.client.setAsync.apply(this.queue.client, args).then(function(result){
return result === 'OK';
return scripts.takeLock(this.queue, this, token, renew).then(function(res){
return res === 1; // Indicates successful lock.
});

@@ -133,3 +143,3 @@ };

Job.prototype.renewLock = function(token){
return this.takeLock(token, true);
return this.takeLock(token, true /* Renew */);
};

@@ -158,9 +168,18 @@

this.returnvalue = returnValue || 0;
return scripts.moveToCompleted(this, token || 0);
return scripts.moveToCompleted(this, token || 0, this.opts.removeOnComplete);
};
Job.prototype.move = function(src, target, token, returnValue){
if(target === 'completed'){
this.returnvalue = returnValue || 0;
if(this.opts.removeOnComplete){
target = void 0;
}
}
return scripts.move(this, token || 0, src, target);
}
Job.prototype.moveToFailed = function(err){
var _this = this;
this.stacktrace.push(err.stack);
return this._saveAttempt().then(function() {
return this._saveAttempt(err).then(function() {
// Check if an automatic retry should be performed

@@ -216,17 +235,22 @@ if(_this.attemptsMade < _this.attempts){

/**
* Attempts to retry the job. Only a job that has failed can be retried.
*
* @return {Promise} If resolved and return code is 1, then the queue emits a waiting event
* otherwise the operation was not a success and throw the corresponding error. If the promise
* rejects, it indicates that the script failed to execute
*/
Job.prototype.retry = function(){
var key = this.queue.toKey('wait');
var failed = this.queue.toKey('failed');
var channel = this.queue.toKey('jobs');
var multi = this.queue.multi();
var queue = this.queue;
var _this = this;
multi.srem(failed, this.jobId);
// if queue is LIFO use rpushAsync
multi[(this.opts.lifo ? 'r' : 'l') + 'push'](key, this.jobId);
multi.publish(channel, this.jobId);
return multi.execAsync().then(function(){
_this.queue.emit('waiting', _this);
return _this;
return scripts.reprocessJob(this, { state: 'failed' }).then(function(result) {
if (result === 1) {
queue.emit('waiting', _this);
} else if (result === 0) {
throw new Error('Couldn\'t retry job: The job doesn\'t exist');
} else if (result === -1) {
throw new Error('Couldn\'t retry job: The job is locked');
} else if (result === -2) {
throw new Error('Couldn\'t retry job: The job has been already retried or has not failed');
}
});

@@ -304,6 +328,5 @@ };

}
return scripts.remove(queue, job.jobId)
.then(function() {
queue.emit('removed', job);
queue.emit('removed', job.toJSON());
})

@@ -316,2 +339,68 @@ .finally(function () {

/**
* Returns a promise the resolves when the job has been finished.
* TODO: Add a watchdog to check if the job has finished periodically.
* since pubsub does not give any guarantees.
*/
Job.prototype.finished = function(){
var _this = this;
function status(resolve, reject){
return _this.isCompleted().then(function(completed){
if(!completed){
return _this.isFailed().then(function(failed){
if(failed){
return Job.fromId(_this.queue, _this.jobId, 'failedReason').then(function(data){
reject(Error(data.failedReason));
return true;
});
}
});
}
resolve();
return true;
});
}
return new Promise(function(resolve, reject){
status(resolve, reject).then(function(finished){
if(!finished){
function onCompleted(job){
if(job.jobId === _this.jobId){
resolve();
}
removeListeners();
}
function onFailed(job, err){
if(job.jobId === _this.jobId){
reject(err);
}
removeListeners();
}
function removeListeners(){
_this.queue.removeListener('completed', onCompleted);
_this.queue.removeListener('failed', onFailed);
}
_this.queue.on('completed', onCompleted);
_this.queue.on('failed', onFailed);
//
// Watchdog
//
var interval = setInterval(function(){
status(resolve, reject).then(function(finished){
if(finished){
removeListeners();
clearInterval(interval );
}
})
}, 5000);
};
});
});
}
// -----------------------------------------------------------------------------

@@ -403,3 +492,3 @@ // Private methods

Job.prototype._saveAttempt = function(){
Job.prototype._saveAttempt = function(err){
if(isNaN(this.attemptsMade)){

@@ -413,5 +502,8 @@ this.attemptsMade = 1;

};
if(this.stacktrace){
params.stacktrace = JSON.stringify(this.stacktrace);
}
this.stacktrace.push(err.stack);
params.stacktrace = JSON.stringify(this.stacktrace);
params.failedReason = err.message;
return this.queue.client.hmsetAsync(this.queue.toKey(this.jobId), params);

@@ -428,2 +520,4 @@ };

job.timestamp = parseInt(data.timestamp);
job.failedReason = data.failedReason;
job.attempts = parseInt(data.attempts);

@@ -454,2 +548,33 @@ if(isNaN(job.attempts)) {

Job.fromJSON = function(queue, json){
var job = new Job(queue, json.data, json.opts);
job.jobId = json.opts.jobId;
job._progress = parseInt(json.progress);
job.delay = parseInt(json.delay);
job.timestamp = parseInt(json.timestamp);
job.attempts = parseInt(json.attempts);
if(isNaN(job.attempts)) {
job.attempts = 1; // Default to 1 try for legacy jobs
}
job.attemptsMade = parseInt(json.attemptsMade);
var _traces;
try{
_traces = JSON.parse(json.stacktrace);
if(!(_traces instanceof Array)){
_traces = [];
}
}catch (err){
_traces = [];
}
job.stacktrace = _traces;
try{
job.returnvalue = JSON.parse(json.returnvalue);
}catch (e){
//swallow exception because the returnvalue got corrupted somehow.
debuglog('corrupted returnvalue: ' + json.returnvalue, e);
}
return job;
}
module.exports = Job;

2

lib/priority-queue.js

@@ -123,3 +123,3 @@ "use strict";

var fn = function() {
return queue.processStalledJobs().then(queue.getNextJob.bind(queue, {
return queue.moveUnlockedJobsToWait().then(queue.getNextJob.bind(queue, {
block: false

@@ -126,0 +126,0 @@ }))

@@ -5,4 +5,6 @@ /*eslint-env node */

var redis = require('redis');
var events = require('events');
var Disturbed = require('disturbed');
var util = require('util');
var assert = require('assert');
var url = require('url');
var Job = require('./job');

@@ -16,2 +18,3 @@ var scripts = require('./scripts');

var debuglog = require('debuglog')('bull');
Promise.promisifyAll(redis.RedisClient.prototype);

@@ -49,2 +52,10 @@ Promise.promisifyAll(redis.Multi.prototype);

var LOCK_RENEW_TIME = 5000; // 5 seconds is the renew time.
// The interval for which to check for stalled jobs.
var STALLED_JOB_CHECK_INTERVAL = 5000; // 5 seconds is the renew time.
// The maximum number of times a job can be recovered from the 'stalled' state
// (moved back to 'wait'), before it is failed.
var MAX_STALLED_JOB_COUNT = 1;
var CLIENT_CLOSE_TIMEOUT_MS = 5000;

@@ -65,2 +76,16 @@ var POLLING_INTERVAL = 5000;

redisOptions.db = redisOpts.DB;
} else if(_.isString(redisPort)) {
try {
var redisUrl = url.parse(redisPort);
assert(_.isObject(redisHost) || _.isUndefined(redisHost),
'Expected an object as redis option');
redisOptions = redisHost || {};
redisPort = redisUrl.port;
redisHost = redisUrl.hostname;
if (redisUrl.auth) {
redisOptions.password = redisUrl.auth.split(':')[1];
}
} catch (e) {
throw new Error(e.message);
}
}

@@ -117,2 +142,4 @@

this.LOCK_RENEW_TIME = LOCK_RENEW_TIME;
this.STALLED_JOB_CHECK_INTERVAL = STALLED_JOB_CHECK_INTERVAL;
this.MAX_STALLED_JOB_COUNT = MAX_STALLED_JOB_COUNT;

@@ -146,14 +173,39 @@ // bubble up Redis error events

Disturbed.call(this, _this.client, _this.eclient);
//
// Listen distributed queue events
//
listenDistEvent('stalled'); //
listenDistEvent('completed'); //
listenDistEvent('failed'); //
listenDistEvent('cleaned');
listenDistEvent('waiting'); //
listenDistEvent('remove'); //
listenDistEvent('progress'); //
function listenDistEvent(eventName){
var _eventName = eventName + '@' + name;
_this.on(_eventName, function(){
var args = Array.prototype.slice.call(arguments);
if(eventName !== 'cleaned'){
args[0] = Job.fromJSON(_this, args[0]);
}
args.unshift(eventName);
_this.emit.apply(_this, args);
});
}
//
// Handle delay, pause and resume messages
//
var delayedKey = _this.toKey('delayed');
var pausedKey = _this.toKey('paused');
this.eclient.on('message', function(channel, message){
if(channel === _this.toKey('delayed')){
if(channel === delayedKey){
_this.updateDelayTimer(message);
}else if(channel === _this.toKey('paused')){
if(message === 'paused'){
_this.emit('paused');
}else if(message === 'resumed'){
_this.emit('resumed');
}
}else if(channel === pausedKey){
_this.emit(message);
}

@@ -190,4 +242,3 @@ });

// in processJobs etc.
this.processStalledJobs = this.processStalledJobs.bind(this);
this.processStalledJob = this.processStalledJob.bind(this);
this.moveUnlockedJobsToWait = this.moveUnlockedJobsToWait.bind(this);
this.getNextJob = this.getNextJob.bind(this);

@@ -199,4 +250,14 @@ this.processJobs = this.processJobs.bind(this);

util.inherits(Queue, events.EventEmitter);
util.inherits(Queue, Disturbed);
/**
*
* Emits a distributed event.
*/
Queue.prototype.distEmit = function(){
var args = Array.prototype.slice.call(arguments);
args[0] = args[0] + '@' + this.name;
return Disturbed.prototype.distEmit.apply(this, args);
}
Queue.prototype.disconnect = function(){

@@ -235,3 +296,3 @@ var _this = this;

clearInterval(_this.guardianTimer);
clearInterval(_this.stalledJobsInterval);
clearInterval(_this.moveUnlockedJobsToWaitInterval);
_this.timers.clearAll();

@@ -435,3 +496,3 @@

return this.processStalledJobs().then(function(){
return this.moveUnlockedJobsToWait().then(function(){

@@ -442,8 +503,7 @@ while(concurrency--){

//
// Set process Stalled jobs intervall
//
clearInterval(_this.stalledJobsInterval);
_this.stalledJobsInterval =
setInterval(_this.processStalledJobs, _this.LOCK_RENEW_TIME);
if (_this.STALLED_JOB_CHECK_INTERVAL > 0){
clearInterval(_this.moveUnlockedJobsToWaitInterval);
_this.moveUnlockedJobsToWaitInterval =
setInterval(_this.moveUnlockedJobsToWait, _this.STALLED_JOB_CHECK_INTERVAL);
}

@@ -489,16 +549,28 @@ return Promise.all(promises);

/**
Process jobs that have been added to the active list but are not being
processed properly.
* Process jobs that have been added to the active list but are not being
* processed properly. This can happen due to a process crash in the middle
* of processing a job, leaving it in 'active' but without a job lock.
*/
Queue.prototype.processStalledJobs = function(){
Queue.prototype.moveUnlockedJobsToWait = function(){
var _this = this;
if(this.closing){
return this.closing;
} else{
return this.client.lrangeAsync(this.toKey('active'), 0, -1).then(function(jobs){
return Promise.each(jobs, function(jobId) {
return Job.fromId(_this, jobId).then(_this.processStalledJob);
return scripts.moveUnlockedJobsToWait(this).then(function(responses){
var handleFailedJobs = responses[0].map(function(jobId){
return _this.getJobFromId(jobId).then(function(job){
_this.distEmit('failed', job.toJSON(), new Error('job stalled more than allowable limit'));
return null;
});
});
var handleStalledJobs = responses[1].map(function(jobId){
return _this.getJobFromId(jobId).then(function(job){
_this.distEmit('stalled', job.toJSON());
return null;
});
});
return Promise.all(handleFailedJobs.concat(handleStalledJobs));
}).catch(function(err){
console.error(err);
console.error('Failed to handle unlocked job in active:', err);
});

@@ -508,20 +580,2 @@ }

Queue.prototype.processStalledJob = function(job){
var _this = this;
if(this.closing){
return this.closing;
}
if(!job){
return Promise.resolve();
}else{
return scripts.getStalledJob(this, job, _this.token).then(function(isStalled){
if(isStalled){
_this.emit('stalled', job);
return _this.processJob(job, true);
}
});
}
};
Queue.prototype.processJobs = function(resolve, reject){

@@ -593,3 +647,3 @@ var _this = this;

.then(function(){
_this.emit('completed', job, data);
_this.distEmit('completed', job.toJSON(), data);
return null; // Fixes #253

@@ -605,3 +659,3 @@ });

.then(function(){
_this.emit('failed', job, error);
_this.distEmit('failed', job.toJSON(), error);
return null; // Fixes #253

@@ -834,3 +888,3 @@ });

return scripts.cleanJobsInSet(_this, type, Date.now() - grace, limit).then(function (jobs) {
_this.emit('cleaned', jobs, type);
_this.distEmit('cleaned', jobs, type);
resolve(jobs);

@@ -837,0 +891,0 @@ return null;

@@ -68,3 +68,6 @@ /**

},
addJob: function(client, toKey, lifo, job){
addJob: function(client, toKey, job, opts){
opts = opts || {};
opts.lifo = !!(opts.lifo);
var jobArgs = _.flatten(_.toPairs(job));

@@ -78,8 +81,12 @@

var argvs = _.map(jobArgs, function(arg, index){
return ', ARGV['+(index+2)+']';
return ', ARGV['+(index+3)+']';
})
var script = [
'local jobId = redis.call("INCR", KEYS[5])',
'redis.call("HMSET", ARGV[1] .. jobId' + argvs.join('') + ')',
'local jobCounter = redis.call("INCR", KEYS[5])',
'local jobId',
'if ARGV[2] == "" then jobId = jobCounter else jobId = ARGV[2] end',
'local jobIdKey = ARGV[1] .. jobId',
'if redis.call("EXISTS", jobIdKey) == 1 then return jobId end',
'redis.call("HMSET", jobIdKey' + argvs.join('') + ')',
];

@@ -92,3 +99,3 @@

script.push.apply(script, [
' local timestamp = tonumber(ARGV[' + (argvs.length + 2) + ']) * 0x1000 + bit.band(jobId, 0xfff)',
' local timestamp = tonumber(ARGV[' + (argvs.length + 3) + ']) * 0x1000 + bit.band(jobCounter, 0xfff)',
' redis.call("ZADD", KEYS[6], timestamp, jobId)',

@@ -101,3 +108,3 @@ ' redis.call("PUBLISH", KEYS[6], (timestamp / 0x1000))',

}else{
var push = (lifo ? 'R' : 'L') + 'PUSH';
var push = (opts.lifo ? 'R' : 'L') + 'PUSH';
//

@@ -113,3 +120,3 @@ // Whe check for the meta-paused key to decide if we are paused or not

'redis.call("PUBLISH", KEYS[4], jobId)',
'return jobId'
'return jobId .. ""'
]);

@@ -129,2 +136,3 @@

args.push(baseKey);
args.push(opts.customJobId || '');
args.push.apply(args, jobArgs);

@@ -135,6 +143,10 @@ args.push(delayTimestamp);

},
moveToCompleted: function(job, token){
// TODO: perfect this function so that it can be used instead
// of all the specialized functions moveToComplete, etc.
move: function(job, token, src, target){
// TODO: Depending on the source we should use LREM, SREM or ZREM.
// TODO: Depending on the target we should use LPUSH, SADD, etc.
var keys = _.map([
'active',
'completed',
src,
target,
job.jobId

@@ -145,16 +157,21 @@ ], function(name){

);
keys.push(job.lockKey());
//
// INVESTIGATE: Should'nt we check if we have the lock before trying to move the
// job?
//
var deleteJob = 'redis.call("DEL", KEYS[3])';
var moveJob = [
'redis.call("SADD", KEYS[2], ARGV[1])',
'redis.call("HSET", KEYS[3], "returnvalue", ARGV[2])',
].join('\n');
var script = [
'if redis.call("EXISTS", KEYS[3]) == 1 then',
' redis.call("SADD", KEYS[2], ARGV[1])',
' redis.call("HSET", KEYS[3], "returnvalue", ARGV[2])',
' redis.call("LREM", KEYS[1], 0, ARGV[1])',
' if redis.call("get", KEYS[4]) == ARGV[3] then', // Remove the lock
' return redis.call("del", KEYS[4])',
'if redis.call("EXISTS", KEYS[3]) == 1 then', // Make sure job exists
' local lock = redis.call("GET", KEYS[4])',
' if (not lock) or (lock == ARGV[3]) then', // Makes sure we own the lock
' redis.call("LREM", KEYS[1], -1, ARGV[1])',
target ? moveJob : deleteJob,
' redis.call("DEL", KEYS[4])',
' return 0',
' else',
' return -2',
' end',

@@ -168,3 +185,3 @@ 'else',

job.queue.client,
'moveToCompletedSet',
'move' + src + (target ? target : ''),
script,

@@ -182,7 +199,16 @@ keys.length,

return execScript.apply(scripts, args).then(function(result){
if(result === -1){
throw new Error('Missing Job ' + job.jobId + ' when trying to move from active to completed');
switch (result){
case -1:
if(src){
throw new Error('Missing Job ' + job.jobId + ' when trying to move from ' + src + ' to ' + target);
} else {
throw new Error('Missing Job ' + job.jobId + ' when trying to remove it from ' + src);
}
case -2:
throw new Error('Cannot get lock for job ' + job.jobId + ' when trying to move from ' + src);
}
});
},
moveToCompleted: function(job, token, removeOnComplete){
return scripts.move(job, token, 'active', removeOnComplete ? void 0 : 'completed');
/*

@@ -286,2 +312,38 @@ var params = {};

},
/**
* Takes a lock
*/
takeLock: function(queue, job, token, renew){
var lockCall;
if (renew){
lockCall = 'redis.call("SET", KEYS[1], ARGV[1], "PX", ARGV[2])';
} else {
lockCall = 'redis.call("SET", KEYS[1], ARGV[1], "PX", ARGV[2], "NX")';
}
var script = [
'if(' + lockCall + ') then',
// Mark the job as having been locked at least once. Used to determine if the job was stalled.
' redis.call("HSET", KEYS[2], "lockAcquired", "1")',
' return 1',
'else',
' return 0',
'end'
].join('\n');
var args = [
queue.client,
'takeLock' + (renew ? 'Renew' : ''),
script,
2,
job.lockKey(),
queue.toKey(job.jobId),
token,
queue.LOCK_RENEW_TIME
];
return execScript.apply(scripts, args);
},
releaseLock: function(job, token){

@@ -291,5 +353,5 @@ var script = [

'then',
'return redis.call("del", KEYS[1])',
' return redis.call("del", KEYS[1])',
'else',
'return 0',
' return 0',
'end'].join('\n');

@@ -362,22 +424,61 @@

/**
* Gets a stalled job by locking it and checking it is not already completed.
* Returns a "OK" if the job was locked and not in completed set.
* Looks for unlocked jobs in the active queue. There are two circumstances in which a job
* would be in 'active' but NOT have a job lock:
*
* Case A) The job was being worked on, but the worker process died and it failed to renew the lock.
* We call these jobs 'stalled'. This is the most common case. We resolve these by moving them
* back to wait to be re-processed. To prevent jobs from cycling endlessly between active and wait,
* (e.g. if the job handler keeps crashing), we limit the number stalled job recoveries to MAX_STALLED_JOB_COUNT.
* Case B) The job was just moved to 'active' from 'wait' and the worker that moved it hasn't gotten
* a lock yet, or died immediately before getting the lock (note that due to Redis limitations, the
* worker can't move the job and get the lock atomically - https://github.com/OptimalBits/bull/issues/258).
* For this case we also move the job back to 'wait' for reprocessing, but don't consider it 'stalled'
* since the job had never been started. This case is much rarer than Case A due to the very small
* timing window in which it must occur.
*/
getStalledJob: function(queue, job, token){
moveUnlockedJobsToWait: function(queue){
var script = [
'if redis.call("sismember", KEYS[1], ARGV[1]) == 0 then',
' return redis.call("set", KEYS[2], ARGV[2], "PX", ARGV[3], "NX")',
'local MAX_STALLED_JOB_COUNT = tonumber(ARGV[1])',
'local activeJobs = redis.call("LRANGE", KEYS[1], 0, -1)',
'local stalled = {}',
'local failed = {}',
'for _, job in ipairs(activeJobs) do',
' local jobKey = ARGV[2] .. job',
' if(redis.call("EXISTS", jobKey .. ":lock") == 0) then',
// Remove from the active queue.
' redis.call("LREM", KEYS[1], 1, job)',
' local lockAcquired = redis.call("HGET", jobKey, "lockAcquired")',
' if(lockAcquired) then',
// If it was previously locked then we consider it 'stalled' (Case A above). If this job
// has been stalled too many times, such as if it crashes the worker, then fail it.
' local stalledCount = redis.call("HINCRBY", jobKey, "stalledCounter", 1)',
' if(stalledCount > MAX_STALLED_JOB_COUNT) then',
' redis.call("SADD", KEYS[3], job)',
' redis.call("HSET", jobKey, "failedReason", "job stalled more than allowable limit")',
' table.insert(failed, job)',
' else',
// Move the job back to the wait queue, to immediately be picked up by a waiting worker.
' redis.call("RPUSH", KEYS[2], job)',
' table.insert(stalled, job)',
' end',
' else',
// Move the job back to the wait queue, to immediately be picked up by a waiting worker.
' redis.call("RPUSH", KEYS[2], job)',
' end',
' end',
'end',
'return 0'].join('\n');
'return {failed, stalled}'
].join('\n');
var args = [
queue.client,
'getStalledJob',
'moveUnlockedJobsToWait',
script,
2,
queue.toKey('completed'),
job.lockKey(),
job.jobId,
token,
queue.LOCK_RENEW_TIME
3,
queue.toKey('active'),
queue.toKey('wait'),
queue.toKey('failed'),
queue.MAX_STALLED_JOB_COUNT,
queue.toKey('')
];

@@ -460,2 +561,63 @@

return execScript.apply(scripts, args);
},
/**
* Attempts to reprocess a job
*
* @param {Job} job
* @param {Object} options
* @param {String} options.state The expected job state. If the job is not found
* on the provided state, then it's not reprocessed. Supported states: 'failed', 'completed'
*
* @return {Promise<Number>} Returns a promise that evaluates to a return code:
* 1 means the operation was a success
* 0 means the job does not exist
* -1 means the job is currently locked and can't be retried.
* -2 means the job was not found in the expected set
*/
reprocessJob: function(job, options) {
var push = (job.opts.lifo ? 'R' : 'L') + 'PUSH';
var script = [
'if (redis.call("EXISTS", KEYS[1]) == 1) then',
' if (redis.call("EXISTS", KEYS[2]) == 0) then',
' if (redis.call("SREM", KEYS[3], ARGV[1]) == 1) then',
' redis.call("' + push + '", KEYS[4], ARGV[1])',
' redis.call("PUBLISH", KEYS[5], ARGV[1])',
' return 1',
' else',
' return -2',
' end',
' else',
' return -1',
' end',
'else',
' return 0',
'end'
].join('\n');
var queue = job.queue;
var keys = [
queue.toKey(job.jobId),
queue.toKey(job.jobId) + ':lock',
queue.toKey(options.state),
queue.toKey('wait'),
queue.toKey('jobs')
];
var args = [
queue.client,
'retryJob',
script,
5,
keys[0],
keys[1],
keys[2],
keys[3],
keys[4],
job.jobId
];
return execScript.apply(scripts, args);
}

@@ -462,0 +624,0 @@ };

{
"name": "bull",
"version": "1.0.0",
"version": "1.1.0",
"description": "Job manager",

@@ -20,8 +20,9 @@ "main": "index.js",

"dependencies": {
"bluebird": "^3.4.1",
"bluebird": "^3.4.6",
"debuglog": "^1.0.0",
"lodash": "^4.13.1",
"disturbed": "^1.0.3",
"lodash": "^4.16.6",
"node-uuid": "^1.4.7",
"redis": "^2.6.2",
"semver": "^5.1.0"
"redis": "^2.6.3",
"semver": "^5.3.0"
},

@@ -31,5 +32,5 @@ "devDependencies": {

"gulp": "^3.9.1",
"gulp-eslint": "^2.0.0",
"gulp-eslint": "^2.1.0",
"mocha": "^2.5.3",
"sinon": "^1.17.4"
"sinon": "^1.17.6"
},

@@ -36,0 +37,0 @@ "scripts": {

@@ -5,9 +5,9 @@ Bull Job Manager

[![Join the chat at https://gitter.im/OptimalBits/bull](https://badges.gitter.im/Join%20Chat.svg)](https://gitter.im/OptimalBits/bull?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
[![npm](https://img.shields.io/npm/dm/bull.svg?maxAge=2592000)]()
[![BuildStatus](https://secure.travis-ci.org/OptimalBits/bull.png?branch=master)](http://travis-ci.org/OptimalBits/bull)
[![NPM version](https://badge.fury.io/js/bull.svg)](http://badge.fury.io/js/bull)
![bull](http://files.softicons.com/download/animal-icons/animal-icons-by-martin-berube/png/128/bull.png)
<img src="https://image.freepik.com/free-icon/strong-bull-side-view_318-52710.jpg", width="200" />
A lightweight, robust and fast job processing queue.
The fastest, more reliable redis based queue for nodejs.
Carefully written for rock solid stability and atomicity.

@@ -22,2 +22,14 @@

Sponsors:
---------
<a href="http://mixmax.com">
<img src="https://mixmax.com/images/logo_confirmation.png" alt="Mixmax, Inc" width="100" />
</a>
<a href="http://optimalbits.com">
<img src="http://optimalbits.com/images/logo.png" />
</a>
Are you developing bull sponsored by a company? Please, let us now!
Features:

@@ -33,2 +45,3 @@ ---------

- Pause/resume (globally or locally).
- Automatic recovery from process crashes.

@@ -40,5 +53,5 @@ UIs:

[matador](https://github.com/ShaneK/Matador)
[react-bull](https://github.com/kfatehi/react-bull)
[toureiro](https://github.com/Epharmix/Toureiro)
* [matador](https://github.com/ShaneK/Matador)
* [react-bull](https://github.com/kfatehi/react-bull)
* [toureiro](https://github.com/Epharmix/Toureiro)

@@ -62,2 +75,3 @@ We also have an official UI which is at the moment bare bones project: [bull-ui](https://github.com/OptimalBits/bull-ui)

var imageQueue = Queue('image transcoding', 6379, '127.0.0.1');
var pdfQueue = Queue('pdf transcoding', 6379, '127.0.0.1');

@@ -122,3 +136,3 @@ videoQueue.process(function(job, done){

return pdfAsyncProcessor();
}
});

@@ -176,4 +190,3 @@ videoQueue.add({video: 'http://example.com/video1.mov'});

.on('stalled', function(job){
// The job was considered stalled (i.e. its lock was not renewed in LOCK_RENEW_TIME).
// Useful for debugging job workers that crash or pause the event loop.
// Job that was considered stalled. Useful for debugging job workers that crash or pause the event loop.
})

@@ -251,2 +264,3 @@ .on('progress', function(job, progress){

If the process that is handling the job fails the reacquire the lock (because it hung or crashed), the job will be automatically restarted by any worker.

@@ -321,2 +335,3 @@ Useful patterns

* [Queue##close](#close)
* [Queue##getJob](#getJob)
* [Job](#job)

@@ -330,2 +345,3 @@ * [Job##remove](#remove)

###Queue(queueName, redisPort, redisHost, [redisOpts])
###Queue(queueName, redisConnectionString, [redisOpts])

@@ -345,2 +361,12 @@ This is the Queue constructor. It creates a new Queue that is persisted in

Alternatively, it's possible to pass a connection string to create a new queue.
__Arguments__
```javascript
queueName {String} A unique name for this Queue.
redisConnectionString {String} A connection string containing the redis server host, port and (optional) authentication.
redisOptions {Object} Options to pass to the redis client. https://github.com/mranney/node_redis
```
---------------------------------------

@@ -413,17 +439,26 @@

```javascript
data {PlainObject} A plain object with arguments that will be passed
to the job processing function in job.data.
opts {PlainObject} A plain object with arguments that will be passed
to the job processing function in job.opts
opts.delay {Number} An amount of miliseconds to wait until this job
can be processed. Note that for accurate delays, both server and clients
should have their clocks synchronized. [optional]
opts.attempts {Number} The total number of attempts to try the job until it completes.
opts.backoff {Number|Object} Backoff setting for automatic retries if the job fails
opts.backoff.type {String} Backoff type, which can be either `fixed` or `exponential`
opts.backoff.delay {String} Backoff delay, in milliseconds
opts.lifo {Boolean} A boolean which, if true, adds the job to the right
of the queue instead of the left (default false)
opts.timeout {Number} The number of milliseconds after which the job
should be fail with a timeout error [optional]
data {PlainObject} A plain object with arguments that will be passed to
the job processing function in job.data.
opts A plain object with arguments that will be passed to the job
processing function in job.opts.
{
delay {Number} An amount of miliseconds to wait until this job can be processed. Note that for accurate delays, both server and clients should have their clocks synchronized. [optional]
attempts {Number} The total number of attempts to try the job until it completes.
backoff {Number|Object} Backoff setting for automatic retries if the job fails
backoff.type {String} Backoff type, which can be either `fixed` or `exponential`
backoff.delay {String} Backoff delay, in milliseconds
lifo {Boolean} A boolean which, if true, adds the job to the right of the queue
instead of the left (default false)
timeout {Number} The number of milliseconds after which the job should be fail
with a timeout error [optional]
jobId {Number|String} Override the job ID - by default, the job ID is a unique
integer, but you can use this setting to override it.
If you use this option, it is up to you to ensure the
jobId is unique. If you attempt to add a job with an id that
already exists, it will not be added.
removeOnComplete {Boolean} A boolean which, if true, removes the job when it successfully
completes. Default behavior is to keep the job in the completed queue.
}
returns {Promise} A promise that resolves when the job has been succesfully

@@ -577,7 +612,5 @@ added to the queue (or rejects if some error occured). On success, the promise

<a name="clean"/>
#### Queue##clean(options)
#### Queue##clean(grace, [type], [limit])
Tells the queue remove all jobs created outside of a grace period.
You can clean the jobs with the following states: completed, waiting, active,
delayed, and failed.
Tells the queue remove jobs of a specific type created outside of a grace period.

@@ -602,2 +635,3 @@ __Example__

delayed, and failed. Defaults to completed.
limit {int} maximum amount of jobs to clean per call. If not provided will clean all matching jobs.
returns {Promise} A promise that resolves with an array of removed jobs.

@@ -604,0 +638,0 @@ ```

@@ -59,2 +59,24 @@ /*eslint-env node */

});
it('should use the custom jobId if one is provided', function() {
var customJobId = 'customjob';
return Job.create(queue, data, { jobId: customJobId }).then(function(createdJob){
expect(createdJob.jobId).to.be.equal(customJobId);
});
});
it('should process jobs with custom jobIds', function(done) {
var customJobId = 'customjob';
queue.process(function () {
return Promise.resolve();
});
queue.add({ foo: 'bar' }, { jobId: customJobId });
queue.on('completed', function(job) {
if (job.opts.jobId == customJobId) {
done();
}
});
});
});

@@ -365,12 +387,12 @@

return Job.create(queue, {foo: 'baz'}).then(function(job) {
return job.isStuck().then(function(yes) {
expect(yes).to.be(false);
return job.isStuck().then(function(isStuck) {
expect(isStuck).to.be(false);
return job.getState();
}).then(function(state) {
expect(state).to.be('waiting');
return job.moveToCompleted();
return job.move('wait', 'completed');
}).then(function (){
return job.isCompleted();
}).then(function (yes) {
expect(yes).to.be(true);
}).then(function (isCompleted) {
expect(isCompleted).to.be(true);
return job.getState();

@@ -394,4 +416,4 @@ }).then(function(state) {

return job.isFailed();
}).then(function (yes) {
expect(yes).to.be(true);
}).then(function (isFailed) {
expect(isFailed).to.be(true);
return job.getState();

@@ -405,3 +427,3 @@ }).then(function(state) {

}).then(function(state) {
expect(state).to.be('waiting');
expect(state).to.be('stuck');
return client.rpopAsync(queue.toKey('wait'));

@@ -412,4 +434,4 @@ }).then(function(){

return job.isPaused();
}).then(function (yes) {
expect(yes).to.be(true);
}).then(function (isPaused) {
expect(isPaused).to.be(true);
return job.getState();

@@ -423,4 +445,4 @@ }).then(function(state) {

return job.isWaiting();
}).then(function (yes) {
expect(yes).to.be(true);
}).then(function (isWaiting) {
expect(isWaiting).to.be(true);
return job.getState();

@@ -433,2 +455,61 @@ }).then(function(state) {

describe('.finished', function() {
it('should resolve when the job has been completed', function(done){
queue.process(function () {
return Promise.resolve();
});
queue.add({ foo: 'bar' }).then(function(job){
return job.finished();
}).then(function(){
done();
}, done);
});
it('should reject when the job has been completed', function(done){
queue.process(function () {
return Promise.reject(Error('test error'));
});
queue.add({ foo: 'bar' }).then(function(job){
return job.finished();
}).then(function(){
done(Error('should have been rejected'));
}, function(err){
expect(err.message).equal('test error');
done();
});
});
it('should resolve directly if already processed', function(done){
queue.process(function () {
return Promise.resolve();
});
queue.add({ foo: 'bar' }).then(function(job){
return Promise.delay(1500).then(function(){
return job.finished();
})
}).then(function(){
done();
}, done);
});
it('should reject directly if already processed', function(done){
queue.process(function () {
return Promise.reject(Error('test error'));
});
queue.add({ foo: 'bar' }).then(function(job){
return Promise.delay(1500).then(function(){
return job.finished();
});
}).then(function(){
done(Error('should have been rejected'));
}, function(err){
expect(err.message).equal('test error');
done();
});
});
it.skip('should resolve using the watchdog if pubsub was lost');
it.skip('should reject using the watchdog if pubsub was lost');
});
});

@@ -24,3 +24,3 @@ /// <reference path='../typings/mocha/mocha.d.ts'/>

describe('Priority queue', function(){
describe.skip('Priority queue', function(){
var queue;

@@ -27,0 +27,0 @@ var sandbox = sinon.sandbox.create();

@@ -33,3 +33,3 @@ /*eslint-env node */

beforeEach(function(){
beforeEach(function () {
var client = redis.createClient();

@@ -39,3 +39,3 @@ return client.flushdbAsync();

afterEach(function(){
afterEach(function () {
sandbox.restore();

@@ -47,3 +47,3 @@ });

beforeEach(function () {
return utils.newQueue('test').then(function(queue){
return utils.newQueue('test').then(function (queue) {
testQueue = queue;

@@ -54,3 +54,3 @@ });

it('should call end on the client', function (done) {
testQueue.client.once('end', function(){
testQueue.client.once('end', function () {
done();

@@ -69,3 +69,3 @@ });

it('should call end on the event subscriber client', function (done) {
testQueue.eclient.once('end', function(){
testQueue.eclient.once('end', function () {
done();

@@ -129,3 +129,3 @@ });

testQueue.on('completed', function(){
testQueue.on('completed', function () {
testQueue.close().then(done);

@@ -160,2 +160,20 @@ });

it('should create a queue with a redis connection string', function (done) {
var queue = new Queue('connstring', 'redis://127.0.0.1:6379');
queue.once('ready', function () {
expect(queue.client.connection_options.host).to.be('127.0.0.1');
expect(queue.bclient.connection_options.host).to.be('127.0.0.1');
expect(queue.client.connection_options.port).to.be(6379);
expect(queue.bclient.connection_options.port).to.be(6379);
expect(queue.client.selected_db).to.be(0);
expect(queue.bclient.selected_db).to.be(0);
queue.close().then(done);
});
});
it('creates a queue using the supplied redis DB', function (done) {

@@ -203,3 +221,3 @@ var queue = new Queue('custom', { redis: { DB: 1 } });

});
}).then(function(){
}).then(function () {
return queue.close();

@@ -213,7 +231,7 @@ });

beforeEach(function(){
beforeEach(function () {
var client = redis.createClient();
return client.flushdbAsync().then(function(){
return client.flushdbAsync().then(function () {
return utils.newQueue();
}).then(function(_queue){
}).then(function (_queue) {
queue = _queue;

@@ -223,3 +241,3 @@ });

afterEach(function(){
afterEach(function () {
return utils.cleanupQueues();

@@ -241,14 +259,12 @@ });

it('process a lifo queue', function (done) {
this.timeout(12000);
this.timeout(3000);
var currentValue = 0, first = true;
utils.newQueue('test lifo').then(function(queue2){
utils.newQueue('test lifo').then(function (queue2) {
queue2.process(function (job, jobDone) {
// Catching the job before the pause
expect(job.data.count).to.be.equal(currentValue--);
if(first) {
jobDone();
if (first) {
first = false;
return jobDone();
}
jobDone();
if(currentValue === 0) {
} else if (currentValue === 0) {
done();

@@ -258,13 +274,10 @@ }

// Add a job to pend proccessing
queue2.add({ 'count': 0 }).then(function () {
queue2.pause().then(function () {
// Add a series of jobs in a predictable order
var fn = function (cb) {
queue2.add({ 'count': ++currentValue }, { 'lifo': true }).then(cb);
};
fn(fn(fn(fn(function () {
queue2.resume();
}))));
});
queue2.pause().then(function () {
// Add a series of jobs in a predictable order
var fn = function (cb) {
queue2.add({ 'count': ++currentValue }, { 'lifo': true }).then(cb);
};
fn(fn(fn(fn(function () {
queue2.resume();
}))));
});

@@ -283,3 +296,3 @@ });

jobDone();
if(counter === maxJobs) {
if (counter === maxJobs) {
done();

@@ -290,3 +303,3 @@ }

for(var i = 1; i <= maxJobs; i++) {
for (var i = 1; i <= maxJobs; i++) {
queue.add({ foo: 'bar', num: i });

@@ -359,3 +372,3 @@ }

expect(job.data.foo).to.be.equal('bar');
return Promise.delay(250).then(function(){
return Promise.delay(250).then(function () {
return 'my data';

@@ -413,3 +426,3 @@ });

this.timeout(12000);
utils.newQueue('test queue stalled').then(function(queueStalled){
utils.newQueue('test queue stalled').then(function (queueStalled) {
queueStalled.LOCK_RENEW_TIME = 10;

@@ -427,5 +440,5 @@ var jobs = [

return queueStalled.close(true).then(function(){
return new Promise(function(resolve) {
utils.newQueue('test queue stalled').then(function(queue2){
return queueStalled.close(true).then(function () {
return new Promise(function (resolve) {
utils.newQueue('test queue stalled').then(function (queue2) {
queue2.LOCK_RENEW_TIME = 100;

@@ -458,3 +471,3 @@ var doneAfterFour = _.after(4, function () {

it('processes jobs that were added before the queue backend started', function () {
utils.newQueue('test queue added before').then(function(queueStalled){
utils.newQueue('test queue added before').then(function (queueStalled) {
queueStalled.LOCK_RENEW_TIME = 10;

@@ -471,3 +484,3 @@ var jobs = [

.then(function () {
utils.newQueue('test queue added before').then(function(queue2){
utils.newQueue('test queue added before').then(function (queue2) {
queue2.process(function (job, jobDone) {

@@ -494,3 +507,3 @@ jobDone();

for(var i = 0; i < NUM_QUEUES; i++) {
for (var i = 0; i < NUM_QUEUES; i++) {
var queueStalled2 = new Queue('test queue stalled 2', 6379, '127.0.0.1');

@@ -500,3 +513,3 @@ stalledQueues.push(queueStalled2);

for(var j = 0; j < NUM_JOBS_PER_QUEUE; j++) {
for (var j = 0; j < NUM_JOBS_PER_QUEUE; j++) {
jobs.push(queueStalled2.add({ job: j }));

@@ -514,3 +527,3 @@ }

processed++;
if(processed === stalledQueues.length) {
if (processed === stalledQueues.length) {
setTimeout(function () {

@@ -525,3 +538,3 @@ var queue2 = new Queue('test queue stalled 2', 6379, '127.0.0.1');

counter++;
if(counter === NUM_QUEUES * NUM_JOBS_PER_QUEUE) {
if (counter === NUM_QUEUES * NUM_JOBS_PER_QUEUE) {
queue2.close().then(done);

@@ -536,3 +549,3 @@ }

var processes = [];
for(var k = 0; k < stalledQueues.length; k++) {
for (var k = 0; k < stalledQueues.length; k++) {
processes.push(stalledQueues[k].process(procFn));

@@ -547,3 +560,8 @@ }

var err = null;
var anotherQueue;
queue.on('completed', function () {
utils.cleanupQueue(anotherQueue).then(done.bind(null, err));
});
queue.add({ foo: 'bar' }).then(function (addedJob) {

@@ -553,3 +571,3 @@ queue.process(function (job, jobDone) {

if(addedJob.jobId !== job.jobId) {
if (addedJob.jobId !== job.jobId) {
err = new Error('Processed job id does not match that of added job');

@@ -559,4 +577,6 @@ }

});
setTimeout(function () {
utils.newQueue().then(function(anotherQueue){
utils.newQueue().then(function (_anotherQueue) {
anotherQueue = _anotherQueue;
setTimeout(function () {
anotherQueue.process(function (job, jobDone) {

@@ -566,8 +586,4 @@ err = new Error('The second queue should not have received a job to process');

});
queue.on('completed', function () {
utils.cleanupQueue(anotherQueue).then(done.bind(null, err));
});
});
}, 50);
}, 50);
});
});

@@ -581,3 +597,3 @@ });

var collect = _.after(2, function(){
var collect = _.after(2, function () {
queue2.close().then(done);

@@ -652,3 +668,3 @@ });

it('process a job that returns data with a circular dependency', function(done){
it('process a job that returns data with a circular dependency', function (done) {
queue.on('error', function (err) {

@@ -721,3 +737,3 @@ done(err);

called++;
if(called % 2 !== 0) {
if (called % 2 !== 0) {
throw new Error('Not even!');

@@ -751,3 +767,3 @@ }

for(var i = 1; i <= maxJobs; i++) {
for (var i = 1; i <= maxJobs; i++) {
added.push(queue.add({ foo: 'bar', num: i }));

@@ -779,3 +795,3 @@ }

describe('.pause', function () {
beforeEach(function(){
beforeEach(function () {
var client = redis.createClient();

@@ -788,3 +804,3 @@ return client.flushdbAsync();

utils.newQueue().then(function(queue){
utils.newQueue().then(function (queue) {
var resultPromise = new Promise(function (resolve) {

@@ -796,3 +812,3 @@ queue.process(function (job, jobDone) {

counter--;
if(counter === 0) {
if (counter === 0) {
resolve(queue.close());

@@ -818,3 +834,3 @@ }

utils.newQueue().then(function(queue){
utils.newQueue().then(function (queue) {
queue.process(function (job, jobDone) {

@@ -825,7 +841,7 @@ expect(ispaused).to.be(false);

if(first) {
if (first) {
first = false;
ispaused = true;
queue.pause();
}else{
} else {
expect(isresumed).to.be(true);

@@ -850,3 +866,3 @@ queue.close().then(done);

it('should pause the queue locally', function(testDone){
it('should pause the queue locally', function (testDone) {
var counter = 2;

@@ -856,19 +872,19 @@

queue.pause(true /* Local */).then(function(){
queue.pause(true /* Local */).then(function () {
// Add the worker after the queue is in paused mode since the normal behavior is to pause
// it after the current lock expires. This way, we can ensure there isn't a lock already
// to test that pausing behavior works.
queue.process(function(job, done){
queue.process(function (job, done) {
expect(queue.paused).not.to.be.ok();
done();
counter--;
if(counter === 0){
if (counter === 0) {
queue.close().then(testDone);
}
});
}).then(function(){
}).then(function () {
return queue.add({ foo: 'paused' });
}).then(function(){
}).then(function () {
return queue.add({ foo: 'paused' });
}).then(function(){
}).then(function () {
expect(counter).to.be(2);

@@ -888,8 +904,8 @@ expect(queue.paused).to.be.ok(); // Parameter should exist.

var jobs = [];
for(var i = 0; i < 10; i++) {
for (var i = 0; i < 10; i++) {
jobs.push(queue.add(i));
}
Promise.all(jobs).then(function() {
queue.pause(true).then(function() {
var active = queue.getJobCountByTypes(['active']).then(function(count) {
Promise.all(jobs).then(function () {
queue.pause(true).then(function () {
var active = queue.getJobCountByTypes(['active']).then(function (count) {
expect(count).to.be(0);

@@ -901,3 +917,3 @@ expect(queue.paused).to.be.ok();

// One job from the 10 posted above will be processed, so we expect 9 jobs pending
var paused = queue.getJobCountByTypes(['wait', 'delayed']).then(function(count) {
var paused = queue.getJobCountByTypes(['wait', 'delayed']).then(function (count) {
expect(count).to.be(9);

@@ -909,6 +925,6 @@ return null;

}).then(function() {
}).then(function () {
return queue.add({});
}).then(function() {
var active = queue.getJobCountByTypes(['active']).then(function(count) {
}).then(function () {
var active = queue.getJobCountByTypes(['active']).then(function (count) {
expect(count).to.be(0);

@@ -918,3 +934,3 @@ return null;

var paused = queue.getJobCountByTypes(['wait', 'delayed']).then(function(count) {
var paused = queue.getJobCountByTypes(['wait', 'delayed']).then(function (count) {
expect(count).to.be(10);

@@ -925,3 +941,3 @@ return null;

return Promise.all([active, paused]);
}).then(function() {
}).then(function () {
return queue.close().then(done, done);

@@ -965,3 +981,3 @@ });

beforeEach(function(){
beforeEach(function () {
var client = redis.createClient();

@@ -1016,7 +1032,7 @@ return client.flushdbAsync();

queue.on('failed', function(err){
queue.on('failed', function (err) {
done(err);
});
queue.on('ready', function() {
queue.on('ready', function () {

@@ -1027,3 +1043,3 @@ queue.process(function (job, jobDone) {

jobDone();
if(order === 10) {
if (order === 10) {
queue.close().then(done, done);

@@ -1059,3 +1075,3 @@ }

if(order === 4) {
if (order === 4) {
queue.close().then(done, done);

@@ -1102,3 +1118,3 @@ }

if(order === 12) {
if (order === 12) {
queue.close().then(done, done);

@@ -1110,7 +1126,7 @@ }

queue.on('ready', function() {
queue.on('ready', function () {
var now = Date.now();
var _promises = [];
var _i = 1;
for(_i; _i <= 12; _i++){
for (_i; _i <= 12; _i++) {
_promises.push(queue.add({ order: _i }, {

@@ -1131,3 +1147,3 @@ delay: 1000,

beforeEach(function(){
beforeEach(function () {
var client = redis.createClient();

@@ -1138,3 +1154,3 @@ queue = utils.buildQueue();

afterEach(function(){
afterEach(function () {
return queue.close();

@@ -1203,3 +1219,3 @@ });

if(++i === 4){
if (++i === 4) {
queue.pause().then(function () {

@@ -1215,3 +1231,3 @@ Promise.delay(500).then(function () { // Wait for all the active jobs to finalize.

// They had a bug in pause() with this special case.
if(i % 3 === 0){
if (i % 3 === 0) {
error = new Error();

@@ -1243,12 +1259,12 @@ }

describe('Retries and backoffs', function() {
describe('Retries and backoffs', function () {
var queue;
afterEach(function(){
afterEach(function () {
return queue.close();
});
it('should automatically retry a failed job if attempts is bigger than 1', function(done) {
it('should automatically retry a failed job if attempts is bigger than 1', function (done) {
queue = utils.buildQueue('test retries and backoffs');
queue.on('ready', function() {
queue.on('ready', function () {

@@ -1258,3 +1274,3 @@ var tries = 0;

tries++;
if(job.attemptsMade < 2){
if (job.attemptsMade < 2) {
throw new Error('Not yet!');

@@ -1270,3 +1286,3 @@ }

});
queue.on('completed', function() {
queue.on('completed', function () {
done();

@@ -1276,9 +1292,9 @@ });

it('should not retry a failed job more than the number of given attempts times', function(done) {
it('should not retry a failed job more than the number of given attempts times', function (done) {
queue = utils.buildQueue('test retries and backoffs');
var tries = 0;
queue.on('ready', function() {
queue.on('ready', function () {
queue.process(function (job, jobDone) {
tries++;
if(job.attemptsMade < 3){
if (job.attemptsMade < 3) {
throw new Error('Not yet!');

@@ -1294,7 +1310,7 @@ }

});
queue.on('completed', function() {
queue.on('completed', function () {
done(new Error('Failed job was retried more than it should be!'));
});
queue.on('failed', function() {
if(tries === 3){
queue.on('failed', function () {
if (tries === 3) {
done();

@@ -1305,9 +1321,9 @@ }

it('should retry a job after a delay if a fixed backoff is given', function(done) {
it('should retry a job after a delay if a fixed backoff is given', function (done) {
this.timeout(12000);
queue = utils.buildQueue('test retries and backoffs');
var start;
queue.on('ready', function() {
queue.on('ready', function () {
queue.process(function (job, jobDone) {
if(job.attemptsMade < 2){
if (job.attemptsMade < 2) {
throw new Error('Not yet!');

@@ -1324,3 +1340,3 @@ }

});
queue.on('completed', function() {
queue.on('completed', function () {
var elapse = Date.now() - start;

@@ -1332,9 +1348,9 @@ expect(elapse).to.be.greaterThan(2000);

it('should retry a job after a delay if an exponential backoff is given', function(done) {
it('should retry a job after a delay if an exponential backoff is given', function (done) {
this.timeout(12000);
queue = utils.buildQueue('test retries and backoffs');
var start;
queue.on('ready', function() {
queue.on('ready', function () {
queue.process(function (job, jobDone) {
if(job.attemptsMade < 2){
if (job.attemptsMade < 2) {
throw new Error('Not yet!');

@@ -1354,3 +1370,3 @@ }

});
queue.on('completed', function() {
queue.on('completed', function () {
var elapse = Date.now() - start;

@@ -1363,2 +1379,120 @@ var expected = 1000 * (Math.pow(2, 2) - 1);

it('should not retry a job that has been removed', function (done) {
queue = utils.buildQueue('retry a removed job');
queue.on('ready', function () {
var attempts = 0;
queue.process(function (job, jobDone) {
if (attempts === 0) {
attempts++;
throw new Error('failed');
} else {
jobDone();
}
});
queue.add({ foo: 'bar' });
});
var failedHandler = _.once(function (job, err) {
expect(job.data.foo).to.equal('bar');
expect(err.message).to.equal('failed');
job.retry().delay(100)
.then(function () {
return queue.getCompletedCount().then(function (count) {
return expect(count).to.equal(1);
});
})
.then(function () {
return queue.clean(0).then(function () {
return job.retry().catch(function (err) {
expect(err.message).to.equal('Couldn\'t retry job: The job doesn\'t exist');
});
});
})
.then(function () {
return Promise.all([
queue.getCompletedCount().then(function (count) {
return expect(count).to.equal(0);
}),
queue.getFailedCount().then(function (count) {
return expect(count).to.equal(0);
})
]);
})
.then(function () {
done();
}, done);
});
queue.on('failed', failedHandler);
});
it('should not retry a job that has been retried already', function (done) {
queue = utils.buildQueue('retry already retried job');
queue.on('ready', function () {
var attempts = 0;
queue.process(function (job, jobDone) {
if (attempts === 0) {
attempts++;
throw new Error('failed');
} else {
jobDone();
}
});
queue.add({ foo: 'bar' });
});
var failedHandler = _.once(function (job, err) {
expect(job.data.foo).to.equal('bar');
expect(err.message).to.equal('failed');
job.retry().delay(100)
.then(function () {
return queue.getCompletedCount().then(function (count) {
return expect(count).to.equal(1);
});
})
.then(function () {
return job.retry().catch(function (err) {
expect(err.message).to.equal('Couldn\'t retry job: The job has been already retried or has not failed');
});
})
.then(function () {
return Promise.all([
queue.getCompletedCount().then(function (count) {
return expect(count).to.equal(1);
}),
queue.getFailedCount().then(function (count) {
return expect(count).to.equal(0);
})
]);
})
.then(function () {
done();
}, done);
});
queue.on('failed', failedHandler);
});
it('should not retry a job that is locked', function (done) {
queue = utils.buildQueue('retry a locked job');
var addedHandler = _.once(function (job) {
expect(job.data.foo).to.equal('bar');
job.retry().catch(function (err) {
expect(err.message).to.equal('Couldn\'t retry job: The job has been already retried or has not failed');
return null;
}).then(done, done);
});
queue.on('ready', function () {
queue.process(function (job, jobDone) {
return Promise.delay(200).then(jobDone);
});
queue.add({ foo: 'bar' }).then(addedHandler);
});
});
});

@@ -1369,3 +1503,3 @@

beforeEach(function(){
beforeEach(function () {
queue = utils.buildQueue();

@@ -1375,3 +1509,3 @@ return queue.clean(1000);

afterEach(function(){
afterEach(function () {
return queue.close();

@@ -1426,3 +1560,3 @@ });

if(counter === 0){
if (counter === 0) {
queue.getCompleted().then(function (jobs) {

@@ -1451,3 +1585,3 @@ expect(jobs).to.be.a('array');

if(counter === 0){
if (counter === 0) {
queue.getFailed().then(function (jobs) {

@@ -1486,7 +1620,7 @@ expect(jobs).to.be.a('array');

describe('getJobs', function() {
describe('getJobs', function () {
this.timeout(12000);
var queue;
beforeEach(function(){
beforeEach(function () {
queue = utils.buildQueue();

@@ -1496,13 +1630,13 @@ return queue.clean(1000);

afterEach(function(){
afterEach(function () {
return queue.close();
});
it('should return all completed jobs when not setting start/end', function(done) {
queue.process(function(job, completed) {
it('should return all completed jobs when not setting start/end', function (done) {
queue.process(function (job, completed) {
completed();
});
queue.on('completed', _.after(3, function() {
queue.getJobs('completed', 'SET').then(function(jobs) {
queue.on('completed', _.after(3, function () {
queue.getJobs('completed', 'SET').then(function (jobs) {
expect(jobs).to.be.an(Array);

@@ -1519,9 +1653,9 @@ expect(jobs).to.have.length(3);

it('should return all failed jobs when not setting start/end', function(done) {
queue.process(function(job, completed) {
it('should return all failed jobs when not setting start/end', function (done) {
queue.process(function (job, completed) {
completed(new Error('error'));
});
queue.on('failed', _.after(3, function() {
queue.getJobs('failed', 'SET').then(function(jobs) {
queue.on('failed', _.after(3, function () {
queue.getJobs('failed', 'SET').then(function (jobs) {
expect(jobs).to.be.an(Array);

@@ -1538,9 +1672,9 @@ expect(jobs).to.have.length(3);

it('should return subset of jobs when setting positive range', function(done) {
queue.process(function(job, completed) {
it('should return subset of jobs when setting positive range', function (done) {
queue.process(function (job, completed) {
completed();
});
queue.on('completed', _.after(3, function() {
queue.getJobs('completed', 'SET', 1, 2).then(function(jobs) {
queue.on('completed', _.after(3, function () {
queue.getJobs('completed', 'SET', 1, 2).then(function (jobs) {
expect(jobs).to.be.an(Array);

@@ -1559,9 +1693,9 @@ expect(jobs).to.have.length(2);

it('should return subset of jobs when setting a negative range', function(done) {
queue.process(function(job, completed) {
it('should return subset of jobs when setting a negative range', function (done) {
queue.process(function (job, completed) {
completed();
});
queue.on('completed', _.after(3, function() {
queue.getJobs('completed', 'SET', -3, -1).then(function(jobs) {
queue.on('completed', _.after(3, function () {
queue.getJobs('completed', 'SET', -3, -1).then(function (jobs) {
expect(jobs).to.be.an(Array);

@@ -1581,9 +1715,9 @@ expect(jobs).to.have.length(3);

it('should return subset of jobs when range overflows', function(done) {
queue.process(function(job, completed) {
it('should return subset of jobs when range overflows', function (done) {
queue.process(function (job, completed) {
completed();
});
queue.on('completed', _.after(3, function() {
queue.getJobs('completed', 'SET', -300, 99999).then(function(jobs) {
queue.on('completed', _.after(3, function () {
queue.getJobs('completed', 'SET', -300, 99999).then(function (jobs) {
expect(jobs).to.be.an(Array);

@@ -1612,7 +1746,7 @@ expect(jobs).to.have.length(3);

afterEach(function(){
afterEach(function () {
return queue.close();
});
it('should reject the cleaner with no grace', function(done){
it('should reject the cleaner with no grace', function (done) {
queue.clean().then(function () {

@@ -1648,4 +1782,4 @@ done(new Error('Promise should not resolve'));

it('should clean two jobs from the queue', function (done) {
queue.add({some: 'data'});
queue.add({some: 'data'});
queue.add({ some: 'data' });
queue.add({ some: 'data' });
queue.process(function (job, jobDone) {

@@ -1668,6 +1802,6 @@ jobDone();

});
queue.add({some: 'data'});
queue.add({some: 'data'});
queue.add({ some: 'data' });
queue.add({ some: 'data' });
Promise.delay(200).then(function () {
queue.add({some: 'data'});
queue.add({ some: 'data' });
queue.clean(100);

@@ -1685,4 +1819,4 @@ }).delay(100).then(function () {

it('should clean all failed jobs', function (done) {
queue.add({some: 'data'});
queue.add({some: 'data'});
queue.add({ some: 'data' });
queue.add({ some: 'data' });
queue.process(function (job, jobDone) {

@@ -1696,3 +1830,3 @@ jobDone(new Error('It failed'));

return queue.count();
}).then(function(len) {
}).then(function (len) {
expect(len).to.be(0);

@@ -1704,4 +1838,4 @@ done();

it('should clean all waiting jobs', function (done) {
queue.add({some: 'data'});
queue.add({some: 'data'});
queue.add({ some: 'data' });
queue.add({ some: 'data' });
Promise.delay(100).then(function () {

@@ -1712,3 +1846,3 @@ return queue.clean(0, 'wait');

return queue.count();
}).then(function(len) {
}).then(function (len) {
expect(len).to.be(0);

@@ -1720,4 +1854,4 @@ done();

it('should clean all delayed jobs', function (done) {
queue.add({some: 'data'}, { delay: 5000 });
queue.add({some: 'data'}, { delay: 5000 });
queue.add({ some: 'data' }, { delay: 5000 });
queue.add({ some: 'data' }, { delay: 5000 });
Promise.delay(100).then(function () {

@@ -1728,3 +1862,3 @@ return queue.clean(0, 'delayed');

return queue.count();
}).then(function(len) {
}).then(function (len) {
expect(len).to.be(0);

@@ -1736,5 +1870,5 @@ done();

it('should clean the number of jobs requested', function (done) {
queue.add({some: 'data'});
queue.add({some: 'data'});
queue.add({some: 'data'});
queue.add({ some: 'data' });
queue.add({ some: 'data' });
queue.add({ some: 'data' });
Promise.delay(100).then(function () {

@@ -1745,3 +1879,3 @@ return queue.clean(0, 'wait', 1);

return queue.count();
}).then(function(len) {
}).then(function (len) {
expect(len).to.be(2);

@@ -1755,4 +1889,4 @@ done();

queue.add({some: 'data'});
queue.add({some: 'data'});
queue.add({ some: 'data' });
queue.add({ some: 'data' });
queue.process(function (job, jobDone) {

@@ -1763,6 +1897,6 @@ jobDone(new Error('It failed'));

Promise.delay(100).then(function () {
return new Promise(function(resolve) {
return new Promise(function (resolve) {
client.hdel('bull:' + queue.name + ':1', 'timestamp', resolve);
});
}).then(function() {
}).then(function () {
return queue.clean(0, 'failed');

@@ -1772,3 +1906,3 @@ }).then(function (jobs) {

return queue.getFailed();
}).then(function(failed) {
}).then(function (failed) {
expect(failed.length).to.be(0);

@@ -1779,2 +1913,2 @@ done();

});
});
});

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc