Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

bull

Package Overview
Dependencies
Maintainers
1
Versions
198
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

bull - npm Package Compare versions

Comparing version 3.0.0-alpha.2 to 3.0.0-alpha.3

examples/cluster-queue.js

5

CHANGELOG.md

@@ -0,1 +1,6 @@

v.3.0.0-alpha.2
===============
- Eliminated possible memory leak #503
v.3.0.0-alpha.1

@@ -2,0 +7,0 @@ ===============

6

lib/commands/index.js

@@ -27,3 +27,3 @@ /**

return loadScripts(client, __dirname);
}
};

@@ -42,6 +42,6 @@ function loadScripts(client, dir) {

}, function(err){
console.log('Error reading script file', err)
console.error('Error reading script file', err);
});
}));
});
};
}

@@ -1,3 +0,1 @@

module.exports.Messages = {

@@ -7,2 +5,2 @@ RETRY_JOB_NOT_EXIST: 'Couldn\'t retry job: The job doesn\'t exist',

RETRY_JOB_NOT_FAILED: 'Couldn\'t retry job: The job has been already retried or has not failed'
}
};

@@ -58,3 +58,3 @@ /*eslint-env node */

priority: opts.priority
});
}, queue.token);
}

@@ -67,3 +67,2 @@

job.id = jobId;
queue.distEmit('waiting', job, null);
debuglog('Job added', jobId);

@@ -90,11 +89,7 @@ return job;

Job.prototype.progress = function(progress){
if(progress){
var _this = this;
this._progress = progress;
return this.queue.client.hset(this.queue.toKey(this.id), 'progress', progress).then(function(){
_this.queue.distEmit('progress', _this, progress);
});
}else{
if(_.isUndefined(progress)){
return this._progress;
}
this._progress = progress;
return scripts.updateProgress(this, progress);
};

@@ -146,9 +141,2 @@

/**
Renews a lock so that it gets some more time before expiring.
*/
Job.prototype.renewLock = function(){
return this.takeLock(true /* Renew */);
};
/**
Releases the lock. Only locks owned by the queue instance can be released.

@@ -172,3 +160,3 @@ */

this._discarded = true;
}
};

@@ -196,3 +184,3 @@ Job.prototype.moveToFailed = function(err, ignoreLock){

if(result === -1){
reject(new Error('Missing Job ' + jobId + ' during retry'));
reject(new Error('Missing Job ' + _this.id + ' during retry'));
}

@@ -204,3 +192,3 @@ }

// If not, move to failed
var args = scripts.moveToFailedArgs(_this, err.message, _this.opts.removeOnFail, ignoreLock)
var args = scripts.moveToFailedArgs(_this, err.message, _this.opts.removeOnFail, ignoreLock);
multi.moveToFinished(args);

@@ -253,7 +241,5 @@ }

Job.prototype.retry = function(){
var queue = this.queue;
var _this = this;
return scripts.reprocessJob(this, { state: 'failed' }).then(function(result) {
if (result === 1) {
queue.distEmit('waiting', _this, null);
return;
} else if (result === 0) {

@@ -341,2 +327,8 @@ throw new Error(errors.Messages.RETRY_JOB_NOT_EXIST);

function getFailed(){
return Job.fromId(_this.queue, _this.id).then(function(job){
throw new Error(job.failedReason);
});
}
return scripts.isFinished(_this).then(function(status){

@@ -346,5 +338,3 @@ var finished = status > 0;

if(status == 2){
return Job.fromId(_this.queue, _this.id, 'failedReason').then(function(data){
throw Error(data.failedReason);
});
return getFailed();
}

@@ -358,22 +348,21 @@ }else{

removeListeners();
clearInterval(interval);
}
}
function onFailed(job, err){
function onFailed(job, failedReason){
if(String(job.id) === String(_this.id)){
reject(err);
reject(new Error(failedReason));
removeListeners();
clearInterval(interval);
}
}
_this.queue.on('global:completed', onCompleted);
_this.queue.on('global:failed', onFailed);
function removeListeners(){
_this.queue.removeListener('completed', onCompleted);
_this.queue.removeListener('failed', onFailed);
clearInterval(interval);
_this.queue.removeListener('global:completed', onCompleted);
_this.queue.removeListener('global:failed', onFailed);
}
_this.queue.on('completed', onCompleted);
_this.queue.on('failed', onFailed);
//

@@ -383,13 +372,17 @@ // Watchdog

interval = setInterval(function(){
status(resolve, reject).then(function(finished){
scripts.isFinished(_this).then(function(status){
var finished = status > 0;
if(finished){
if(status == 2){
getFailed().then(resolve, reject);
}
removeListeners();
clearInterval(interval);
resolve();
}
})
});
}, FINISHED_WATCHDOG);
});
};
}
});
}
};

@@ -479,4 +472,4 @@ // -----------------------------------------------------------------------------

return job;
}
};
module.exports = Job;

@@ -5,5 +5,5 @@ /*eslint-env node */

var redis = require('ioredis');
var Disturbed = require('disturbed');
var EventEmitter = require('events');
var util = require('util');
var assert = require('assert');
var url = require('url');

@@ -54,61 +54,59 @@ var Job = require('./job');

var LOCK_DURATION = 5000; // 5 seconds is the duration of the lock.
var MAX_TIMEOUT_MS = Math.pow(2, 31) - 1; // 32 bit signed
// The interval for which to check for stalled jobs.
var STALLED_JOB_CHECK_INTERVAL = 5000; // 5 seconds is the renew time.
/*
interface QueueOptions {
prefix?: string = 'bull',
redis : RedisOpts, // ioredis defaults
createClient?: (type: enum('client', 'subscriber'), redisOpts?: RedisOpts) => redisClient,
// The maximum number of times a job can be recovered from the 'stalled' state
// (moved back to 'wait'), before it is failed.
var MAX_STALLED_JOB_COUNT = 1;
// Advanced settings
settings?: QueueSettings {
lockDuration?: number = 30000,
lockRenewTime?: number = lockDuration / 2,
stalledInterval?: number = 30000,
maxStalledCount?: number = 1, // The maximum number of times a job can be recovered from the 'stalled' state
guardInterval?: number = 5000,
retryProcessDelay?: number = 5000
}
}
*/
var POLLING_INTERVAL = 5000;
// Queue(name: string, url?, opts?)
var Queue = function Queue(name, url, opts){
var _this = this;
if(!(this instanceof Queue)){
return new Queue(name, url, opts);
}
var RETRY_PROCESS_DELAY = 5000;
if(_.isString(url)){
opts = _.defaults(redisOptsFromUrl, opts);
}else{
opts = url;
}
var REDLOCK_DRIFT_FACTOR = 0.01;
var REDLOCK_RETRY_COUNT = 0;
var REDLOCK_RETRY_DELAY = 200;
opts = opts || {};
var MAX_TIMEOUT_MS = Math.pow(2, 31) - 1; // 32 bit signed
if(opts && !_.isObject(opts)){
throw Error('Options must be a valid object');
}
var redisOpts = opts.redis || {};
var Queue = function Queue(name, redisPort, redisHost, redisOptions){
if(!(this instanceof Queue)){
return new Queue(name, redisPort, redisHost, redisOptions);
}
if(_.isObject(redisPort)) {
var opts = redisPort;
var redisOpts = opts.redis || {};
redisPort = redisOpts.port;
redisHost = redisOpts.host;
redisOptions = redisOpts.opts || {};
redisOptions.db = redisOpts.DB || redisOpts.DB;
} else if(parseInt(redisPort) == redisPort) {
redisPort = parseInt(redisPort);
redisOptions = redisOptions || {};
} else if(_.isString(redisPort)) {
try {
var redisUrl = url.parse(redisPort);
assert(_.isObject(redisHost) || _.isUndefined(redisHost),
'Expected an object as redis option');
redisOptions = redisHost || {};
redisPort = redisUrl.port;
redisHost = redisUrl.hostname;
if (redisUrl.auth) {
redisOptions.password = redisUrl.auth.split(':')[1];
}
} catch (e) {
throw new Error(e.message);
_.defaults(redisOpts, {
port: 6379,
host: '127.0.0.1',
db: redisOpts.db || redisOpts.DB,
retryStrategy: function (times) {
var delay = Math.min(Math.exp(times), 20000);
return delay;
}
}
});
redisOptions = redisOptions || {};
function createClient(type) {
function createClient(type, redisOpts) {
var client;
if(_.isFunction(redisOptions.createClient)){
client = redisOptions.createClient(type);
if(_.isFunction(opts.createClient)){
client = opts.createClient(type, redisOpts);
}else{
client = new redis(redisPort, redisHost, redisOptions);
client = new redis(redisOpts);
}

@@ -118,16 +116,11 @@ return client;

redisPort = redisPort || 6379;
redisHost = redisHost || '127.0.0.1';
var _this = this;
this.name = name;
this.keyPrefix = redisOptions.keyPrefix || 'bull';
this.keyPrefix = redisOpts.keyPrefix || opts.prefix || 'bull';
this.token = uuid();
//
// We cannot use ioredis keyPrefix feature until we
// stop creating keys dynamically in lua scripts.
// We cannot use ioredis keyPrefix feature since we
// create keys dynamically in lua scripts.
//
delete redisOptions.keyPrefix;
delete redisOpts.keyPrefix;

@@ -137,3 +130,3 @@ //

//
this.client = createClient('client');
this.client = createClient('client', redisOpts);

@@ -149,19 +142,5 @@ getRedisVersion(this.client).then(function(version){

//
// Keep track of cluster clients for redlock
// (Redlock is not used ATM.)
this.clients = [this.client];
if (redisOptions.clients) {
this.clients.push.apply(this.clients, redisOptions.clients);
}
this.redlock = {
driftFactor: REDLOCK_DRIFT_FACTOR,
retryCount: REDLOCK_RETRY_COUNT,
retryDelay: REDLOCK_RETRY_DELAY
};
_.extend(this.redlock, redisOptions.redlock || {});
//
// Create event subscriber client (receive messages from other instance of the queue)
//
this.eclient = createClient('subscriber');
this.eclient = createClient('subscriber', redisOpts);

@@ -173,7 +152,12 @@ this.handlers = {};

this.LOCK_DURATION = LOCK_DURATION;
this.LOCK_RENEW_TIME = LOCK_DURATION / 2;
this.STALLED_JOB_CHECK_INTERVAL = STALLED_JOB_CHECK_INTERVAL;
this.MAX_STALLED_JOB_COUNT = MAX_STALLED_JOB_COUNT;
this.settings = _.defaults(opts.settings, {
lockDuration: 30000,
stalledInterval: 30000,
maxStalledCount: 1,
guardInterval: 5000,
retryProcessDelay: 5000
});
this.settings.lockRenewTime = this.settings.lockRenewTime || this.settings.lockDuration / 2;
// bubble up Redis error events

@@ -184,2 +168,6 @@ [this.client, this.eclient].forEach(function (client) {

this.on('error', function(){
// Dummy handler to avoid process to exit with an unhandled exception.
});
// keeps track of active timers. used by close() to

@@ -190,77 +178,16 @@ // ensure that disconnect() is deferred until all

// emit ready when redis connections ready
var initializers = [this.client, this.eclient].map(function (client) {
return new Promise(function(resolve, reject) {
client.once('ready', resolve);
client.once('error', reject);
});
});
var events = [
'delayed',
'paused',
'resumed',
'added'
]
this._initializing = Promise.all(initializers).then(function(){
return Promise.all(events.map(function(event){
return _this.eclient.subscribe(_this.toKey(event))
})).then(function(){
return commands(_this.client);
});
}).then(function(){
debuglog(name + ' queue ready');
_this.emit('ready');
}, function(err){
_this.emit('error', err, 'Error initializing queue');
});
//
// Handle delay, pause and resume messages
// Init
//
this.eclient.on('message', function(channel, message){
switch(channel){
case _this.toKey('delayed'):
_this.updateDelayTimer(message);
break;
case _this.toKey('paused'):
case _this.toKey('resumed'):
_this.emit(message);
break;
case _this.toKey('added'):
_this.emit('added', message);
break;
}
});
this._init(name);
Disturbed.call(this, _this.client, _this.eclient);
//
// Listen distributed queue events
//
listenDistEvent('waiting'); //
listenDistEvent('active'); //
listenDistEvent('progress'); //
listenDistEvent('stalled'); //
listenDistEvent('completed'); //
listenDistEvent('failed'); //
listenDistEvent('cleaned');
listenDistEvent('remove'); //
// Only setup listeners if .on/.addEventListener called, or process function defined.
//
this._setupQueueEventListeners();
function listenDistEvent(eventName){
var _eventName = eventName + '@' + name;
_this.on(_eventName, function(){
var args = Array.prototype.slice.call(arguments);
if(eventName !== 'cleaned' && eventName !== 'error'){
args[0] = Job.fromJSON(_this, args[0]);
}
args.unshift('global:' + eventName);
_this.emit.apply(_this, args);
}, true);
}
this.delayedTimestamp = Number.MAX_VALUE;
this.isReady().then(function(){
// TODO: These are only useful if a process function has been defined.
//

@@ -291,7 +218,20 @@ // Init delay timestamp.

util.inherits(Queue, Disturbed);
function redisOptsFromUrl(urlString){
var redisOpts = {};
try {
var redisUrl = url.parse(urlString);
redisOpts.port = redisUrl.port;
redisOpts.host = redisUrl.hostname;
if (redisUrl.auth) {
redisOpts.password = redisUrl.auth.split(':')[1];
}
} catch (e) {
throw new Error(e.message);
}
return redisOpts;
}
function setGuardianTimer(queue){
return setInterval(function() {
if(queue.delayedTimestamp < Date.now() || queue.delayedTimestamp - Date.now() > POLLING_INTERVAL){
if(queue.delayedTimestamp < Date.now() || queue.delayedTimestamp - Date.now() > queue.settings.guardInterval){
scripts.updateDelaySet(queue, Date.now()).then(function(timestamp){

@@ -309,45 +249,109 @@ if(timestamp){

queue.emit('added');
}, POLLING_INTERVAL);
}, queue.settings.guardInterval);
}
Queue.ErrorMessages = errors.Messages;
util.inherits(Queue, EventEmitter);
Queue.prototype.isReady = function(){
Queue.prototype.off = Queue.prototype.removeListener;
Queue.prototype._init = function(name){
var _this = this;
return this._initializing.then(function(){
return _this;
var initializers = [this.client, this.eclient].map(function (client) {
var _resolve, errorHandler;
return new Promise(function(resolve, reject) {
_resolve = resolve;
errorHandler = function(err){
if(err.code !== 'ECONNREFUSED'){
reject(err);
}
};
client.once('ready', resolve);
client.on('error', errorHandler);
}).finally(function(){
client.removeListener('ready', _resolve);
client.removeListener('error', errorHandler);
});
});
}
Queue.prototype.whenCurrentMoveFinished = function(){
var currentMove = this.client.commandQueue.peekFront()
return currentMove && currentMove.command.promise || Promise.resolve();
this._initializing = Promise.all(initializers).then(function(){
return _this.eclient.psubscribe(_this.toKey('') + '*');
}).then(function(){
return commands(_this.client);
}).then(function(){
debuglog(name + ' queue ready');
}, function(err){
_this.emit('error', err, 'Error initializing queue');
throw err;
});
};
/**
*
* Emits a distributed event.
*/
Queue.prototype.distEmit = function(){
var args = Array.prototype.slice.call(arguments);
// Emit local event
this.emit.apply(this, args);
Queue.prototype._setupQueueEventListeners = function(){
/*
if(eventName !== 'cleaned' && eventName !== 'error'){
args[0] = Job.fromJSON(_this, args[0]);
}
*/
var _this = this;
var activeKey = _this.toKey('active');
var progressKey = _this.toKey('progress');
var delayedKey = _this.toKey('delayed');
var pausedKey = _this.toKey('paused');
var resumedKey = _this.toKey('resumed');
var addedKey = _this.toKey('added');
var completedKey = _this.toKey('completed');
var failedKey = _this.toKey('failed');
// Emit global event
args[0] = args[0] + '@' + this.name;
return Disturbed.prototype.distEmit.apply(this, args);
}
this.eclient.on('pmessage', function(channel, pattern, message){
var keyAndToken = pattern.split('@');
var key = keyAndToken[0];
var token = keyAndToken[1];
Queue.prototype.on = function(){
var args = Array.prototype.slice.call(arguments);
var promise = Disturbed.prototype.on.apply(this, args);
switch(key){
case activeKey:
_this.emit('global:active', message, 'waiting');
break;
case progressKey:
var jobAndProgress = message.split(':');
_this.emit('global:progress', jobAndProgress[0], jobAndProgress[1]);
break;
case delayedKey:
_this.updateDelayTimer(message);
break;
case pausedKey:
case resumedKey:
_this.emit('global:' + message);
break;
case addedKey:
_this.emit('added', message);
if(_this.token === token){
_this.emit('waiting', message, null);
}
token && _this.emit('global:waiting', message, null);
break;
case completedKey:
var data = JSON.parse(message);
var job = Job.fromJSON(_this, data.job);
_this.emit('global:completed', job, data.val, 'active');
break;
case failedKey:
var data = JSON.parse(message);
var job = Job.fromJSON(_this, data.job);
_this.emit('global:failed', job, data.val, 'active');
break;
}
});
};
Queue.ErrorMessages = errors.Messages;
Queue.prototype.isReady = function(){
var _this = this;
promise.catch(function(err){ _this.emit('error', err); });
return this;
return this._initializing.then(function(){
return _this;
});
};
Queue.prototype.once = function(){
var args = Array.prototype.slice.call(arguments);
Disturbed.prototype.once.apply(this, args);
return this;
Queue.prototype.whenCurrentMoveFinished = function(){
var currentMove = this.client.commandQueue.peekFront();
return currentMove && currentMove.command.promise || Promise.resolve();
};

@@ -380,3 +384,3 @@

return this.closing = this._initializing.then(function(){
return this.closing = this.isReady().then(function(){
_.each(_this.errorRetryTimer, function(timer){

@@ -403,2 +407,10 @@ clearTimeout(timer);

Deprecate in favor of:
/*
queue.work('export', opts, function(job, input){
return output;
}, 'adrapid-export-results');
@method process

@@ -432,3 +444,3 @@ */

});
}
};

@@ -449,3 +461,2 @@ Queue.prototype.setHandler = function(name, handler){

/**

@@ -532,59 +543,37 @@ interface JobOptions

Queue.prototype.pause = function(isLocal, doNotWaitActive){
if(isLocal){
var _this = this;
if(!this.paused){
this.paused = new Promise(function(resolve) {
_this.resumeLocal = function() {
resolve();
_this.paused = null; // Allow pause to be checked externally for paused state.
};
});
var _this = this;
return _this.isReady().then(function(){
if(isLocal){
if(!_this.paused){
_this.paused = new Promise(function(resolve) {
_this.resumeLocal = function() {
resolve();
_this.paused = null; // Allow pause to be checked externally for paused state.
};
});
}
return !doNotWaitActive && _this.whenCurrentJobsFinished();
}else{
return scripts.pause(_this, true);
}
return !doNotWaitActive && this.whenCurrentJobsFinished();
}else{
return pauseResumeGlobal(this, true);
}
}).then(function(){
_this.emit('paused');
});
};
Queue.prototype.resume = function(isLocal /* Optional */){
if(isLocal){
if(this.resumeLocal){
this.resumeLocal();
var _this = this;
return this.isReady().then(function(){
if(isLocal){
if(_this.resumeLocal){
_this.resumeLocal();
}
}else{
return scripts.pause(_this, false);
}
return Promise.resolve();
}else{
return pauseResumeGlobal(this, false);
}
}).then(function(){
_this.emit('resumed');
});
};
//
// TODO: move to scripts module.
//
function pauseResumeGlobal(queue, pause){
var src = 'wait', dst = 'paused';
if(!pause){
src = 'paused';
dst = 'wait';
}
var script = [
'if redis.call("EXISTS", KEYS[1]) == 1 then',
' redis.call("RENAME", KEYS[1], KEYS[2])',
'end',
'if ARGV[1] == "paused" then',
' redis.call("SET", KEYS[3], 1)',
'else',
' redis.call("DEL", KEYS[3])',
'end',
'redis.call("PUBLISH", KEYS[4], ARGV[1])'
].join('\n');
var keys = _.map([src, dst, 'meta-paused', 'paused'], function(name){
return queue.toKey(name);
});
return queue.client.eval(script, keys.length, keys[0], keys[1], keys[2], keys[3], pause ? 'paused' : 'resumed');
}
Queue.prototype.run = function(concurrency){

@@ -655,3 +644,3 @@ var promises = [];

return _this.getJobFromId(jobId).then(function(job){
_this.distEmit('failed', job, new Error('job stalled more than allowable limit'), 'active' );
_this.emit('failed', job, new Error('job stalled more than allowable limit'), 'active' );
return null;

@@ -662,3 +651,3 @@ });

return _this.getJobFromId(jobId).then(function(job){
_this.distEmit('stalled', job);
_this.emit('stalled', job);
return null;

@@ -675,5 +664,5 @@ });

clearInterval(this.moveUnlockedJobsToWaitInterval);
if (this.STALLED_JOB_CHECK_INTERVAL > 0){
if (this.settings.stalledInterval > 0){
this.moveUnlockedJobsToWaitInterval =
setInterval(this.moveUnlockedJobsToWait, this.STALLED_JOB_CHECK_INTERVAL);
setInterval(this.moveUnlockedJobsToWait, this.settings.stalledInterval);
}

@@ -690,3 +679,3 @@ };

.then(_this.processJob)
.then(processJobs, function(err){
.then(processJobs, function(/*err*/){
//

@@ -698,3 +687,3 @@ // Wait before trying to process again.

processJobs();
}, RETRY_PROCESS_DELAY)
}, _this.settings.retryProcessDelay);
});

@@ -728,14 +717,12 @@ }).catch(function(err){

var lockExtender = function(){
_this.timers.set('lockExtender', _this.LOCK_RENEW_TIME, function(){
if(!timerStopped){
scripts.extendLock(_this, job.id).then(function(lock){
if(lock){
lockExtender();
}
}).catch(function(err){
// Somehow tell the worker this job should stop processing...
});
}
lockRenewId = _this.timers.set('lockExtender', _this.settings.lockRenewTime, function(){
scripts.extendLock(_this, job.id).then(function(lock){
if(lock && !timerStopped){
lockExtender();
}
}).catch(function(/*err*/){
// Somehow tell the worker this job should stop processing...
});
});
}
};

@@ -757,6 +744,4 @@ var timeoutMs = job.opts.timeout;

return job.moveToCompleted(result).then(function(){
return _this.distEmit('completed', job, result, 'active');
}).finally(function(){
stopTimer();
})
_this.emit('completed', job, result, 'active');
});
}

@@ -769,9 +754,7 @@

return job.moveToFailed(err).then(function(){
return _this.distEmit('failed', job, error, 'active');
}).finally(function(){
stopTimer();
})
_this.emit('failed', job, error, 'active');
});
}
lockExtender()
lockExtender();
var handler = _this.handlers[job.name];

@@ -787,5 +770,10 @@ if(!handler){

_this.distEmit('active', job, jobPromise, 'waiting');
// Local event with jobPromise so that we can cancel job.
// Probably we could have better ways to do this...
// For example, listen to a global event 'cancel'
_this.emit('active', job, jobPromise, 'waiting');
return jobPromise.then(handleCompleted, handleFailed);
return jobPromise.then(handleCompleted, handleFailed).finally(function(){
stopTimer();
});
}

@@ -816,5 +804,5 @@ };

_resolve();
}
};
_this.on('added', resolve);
_this.on('resumed', resolve);
_this.on('global:resumed', resolve);
_this.on('wait-finished', resolve);

@@ -825,5 +813,5 @@ });

_this.removeListener('added', resolve);
_this.removeListener('resumed', resolve);
_this.removeListener('global:resumed', resolve);
_this.removeListener('wait-finished', resolve);
}
};

@@ -875,5 +863,5 @@ return scripts.moveToActive(this).spread(function(jobData, jobId){

return res.map(function(v) {
return v[1]
return v[1];
}).reduce(function(a, b) {
return a + b
return a + b;
});

@@ -1014,3 +1002,3 @@ }) || 0;

return scripts.cleanJobsInSet(_this, type, Date.now() - grace, limit).then(function (jobs) {
_this.distEmit('cleaned', jobs, type);
_this.emit('cleaned', jobs, type);
return jobs;

@@ -1017,0 +1005,0 @@ }).catch(function (err) {

@@ -21,2 +21,2 @@

}
};
/**
* Includes all the scripts needed by the queue and jobs.
*
*
*/
/*eslint-env node */
/*global Promise:true */
'use strict';
var Promise = require('bluebird');
var _ = require('lodash');

@@ -33,5 +30,3 @@ var debuglog = require('debuglog')('bull');

addJob: function(client, toKey, job, opts){
var queue = job.queue;
addJob: function(client, toKey, job, opts, token){
var keys = _.map(['wait', 'paused', 'meta-paused', 'added', 'id', 'delayed', 'priority'], function(name){

@@ -41,5 +36,7 @@ return toKey(name);

keys[3] = keys[3] + '@' + token;
var args = [
toKey(''),
_.isUndefined(opts.customJobId) ? "" : opts.customJobId,
_.isUndefined(opts.customJobId) ? '' : opts.customJobId,
job.name,

@@ -50,5 +47,5 @@ job.data,

job.delay,
job.delay ? job.timestamp + job.delay : "0",
opts.priority || 0,
opts.lifo ? "LIFO" : "FIFO"
job.delay ? job.timestamp + job.delay : '0',
opts.priority || 0,
opts.lifo ? 'LIFO' : 'FIFO'
];

@@ -60,7 +57,4 @@

pause: function(queue, pause) {
var src, dst;
if(pause){
src = 'wait';
dst = 'paused';
}else{
var src = 'wait', dst = 'paused';
if(!pause){
src = 'paused';

@@ -78,14 +72,12 @@ dst = 'wait';

moveToActive: function(queue){
var keys = _.map([
'wait',
'active',
'priority'], function(name){
return queue.toKey(name);
}
);
var keys = _.map(['wait','active','priority'], function(name){
return queue.toKey(name);
});
keys[3] = keys[1] + '@' + queue.token;
var args = [
queue.toKey(''),
queue.token,
queue.LOCK_DURATION
queue.settings.lockDuration
];

@@ -105,13 +97,22 @@

updateProgress: function(job, progress){
var queue = job.queue;
var keys = [job.id, 'progress'].map(function(name){
return queue.toKey(name);
});
return queue.client.updateProgress(keys, [progress, job.id + ':' + progress]).then(function(){
queue.emit('progress', job, progress);
});
},
moveToFinishedArgs: function(job, val, propVal, shouldRemove, target, ignoreLock){
var queue = job.queue;
var keys = _.map([
'active',
target,
job.id], function(name){
return queue.toKey(name);
}
);
var keys = _.map(['active', target, job.id], function(name){
return queue.toKey(name);
});
job[propVal] = val;
var args = [

@@ -122,4 +123,5 @@ job.id,

val,
ignoreLock ? "0" : job.queue.token,
shouldRemove ? "1" : "0"
ignoreLock ? '0' : job.queue.token,
shouldRemove ? '1' : '0',
JSON.stringify({job: job.toJSON(), val: val})
];

@@ -173,10 +175,5 @@

var keys = _.map([
'active',
'delayed',
jobId
], function(name){
return queue.toKey(name);
}
);
var keys = _.map(['active', 'delayed', jobId], function(name){
return queue.toKey(name);
});
return keys.concat([JSON.stringify(context), jobId]);

@@ -202,6 +199,6 @@ },

'failed',
jobId], function(name){
return queue.toKey(name);
}
);
jobId],
function(name){
return queue.toKey(name);
});

@@ -212,3 +209,3 @@ return queue.client.removeJob(keys.concat([jobId, queue.token]));

extendLock: function(queue, jobId){
return queue.client.extendLock([queue.toKey(jobId) + ':lock', queue.token, queue.LOCK_DURATION]);
return queue.client.extendLock([queue.toKey(jobId) + ':lock', queue.token, queue.settings.lockDuration]);
},

@@ -221,3 +218,3 @@

takeLock: function(queue, job){
return queue.client.takeLock([job.lockKey(), queue.token, queue.LOCK_DURATION]);
return queue.client.takeLock([job.lockKey(), queue.token, queue.settings.lockDuration]);
},

@@ -234,4 +231,5 @@

'wait',
'added'], function(name){
return queue.toKey(name);
'added'],
function(name){
return queue.toKey(name);
});

@@ -250,3 +248,3 @@

* back to wait to be re-processed. To prevent jobs from cycling endlessly between active and wait,
* (e.g. if the job handler keeps crashing), we limit the number stalled job recoveries to MAX_STALLED_JOB_COUNT.
* (e.g. if the job handler keeps crashing), we limit the number stalled job recoveries to settings.maxStalledCount.
*/

@@ -257,3 +255,3 @@ moveUnlockedJobsToWait: function(queue){

});
var args = [queue.MAX_STALLED_JOB_COUNT, queue.toKey(''), Date.now()];
var args = [queue.settings.maxStalledCount, queue.toKey(''), Date.now()];
return queue.client.moveUnlockedJobsToWait(keys.concat(args));

@@ -346,11 +344,2 @@ },

retryJob: function(job){
var args = scripts.retryJobArgs(job);
return queue.client.retryJob(args).then(function(result){
if(result === -1){
throw new Error('Missing Job ' + jobId + ' during retry');
}
});
},
/**

@@ -378,3 +367,3 @@ * Attempts to reprocess a job

queue.toKey('wait'),
queue.toKey('added')
queue.toKey('added') + '@' + queue.token
];

@@ -396,5 +385,5 @@

for(var i=0; i < arr.length; i+=2){
obj[arr[i]] = arr[i+1]
obj[arr[i]] = arr[i+1];
}
return obj;
}
}

@@ -9,7 +9,6 @@ # Migration from 2.x to 3.0.0

In 3.x, the jobs that are completed and failed end in two ZSETS, instead of a standard SET.
This gives the posibility of retrieving a subset of the jobs in a high performant way, which
is useful for grafical tools and scripts.
This gives the possibility of retrieving a subset of the jobs in a high performant way, which
is useful for graphical tools and scripts. However an old queue will not be compatible with 3.x.
You will need to either delete the complete and failed keys, or create a new queue.
migrate tool?
# Data structure changes

@@ -23,4 +22,20 @@

# Queue instantiation options
Sanitized and cleaned all the options. Check the README to see the new structure.
# Events
All events are now published atomically in the scripts where they are relevant, this increases efficiency and
reduces chances for hazards.
'ready' event has been removed, you can use ```Queue##isReady()``` instead if you want to know when the queue
has been initialized. Normally you will never need to wait for readyness since this is taken care internally
by the queue methods that require the queue to be ready.
Events arguments are now the same for local and global events. This affects events such as completed and failed,
where in 2.x the first argument was a job instance for local jobs. Now both local and global events pass
jobId as first argument to the event handler. If the job instance is needed it can be easily retrieved with
```Job.fromId()```.
{
"name": "bull",
"version": "3.0.0-alpha.2",
"version": "3.0.0-alpha.3",
"description": "Job manager",
"main": "index.js",
"main": "./lib/queue",
"repository": {

@@ -21,5 +21,3 @@ "type": "git",

"bluebird": "^3.5.0",
"bull-redlock": "^2.2.1",
"debuglog": "^1.0.0",
"disturbed": "^1.0.6",
"ioredis": "^3.0.0-1",

@@ -31,5 +29,4 @@ "lodash": "^4.17.4",

"devDependencies": {
"eslint": "^2.13.1",
"expect.js": "^0.3.1",
"gulp": "^3.9.1",
"gulp-eslint": "^2.1.0",
"mocha": "^2.5.3",

@@ -39,5 +36,31 @@ "sinon": "^1.17.7"

"scripts": {
"test": "gulp && mocha test/test_* --reporter spec --timeout 5000",
"lint": "eslint lib test",
"pretest": "npm run lint",
"test": "mocha",
"postpublish": "git push && git push --tags"
},
"eslintConfig": {
"rules": {
"indent": [
2,
2,
{
"SwitchCase": 1
}
],
"semi": 2,
"valid-jsdoc": 0,
"func-style": 0,
"no-use-before-define": 0,
"camelcase": 1,
"no-unused-vars": 1,
"no-alert": 1,
"no-console": [2, {"allow": ["warn", "error"]}],
"quotes": [
2,
"single"
],
"no-underscore-dangle": 0
}
}
}

@@ -14,3 +14,3 @@ Bull Job Manager

The fastest, most reliable redis based queue for nodejs.
The fastest, most reliable Redis based queue for nodejs.
Carefully written for rock solid stability and atomicity.

@@ -54,4 +54,2 @@

We also have an official UI which is at the moment bare bones project: [bull-ui](https://github.com/OptimalBits/bull-ui)
Roadmap:

@@ -71,3 +69,3 @@ --------

Note that you need a redis version higher or equal than 2.8.11 for bull to work properly.
Note that you need a Redis version higher or equal than 2.8.11 for bull to work properly.

@@ -84,6 +82,6 @@ **IMPORTANT**

var videoQueue = Queue('video transcoding', 6379, '127.0.0.1');
var audioQueue = Queue('audio transcoding', 6379, '127.0.0.1');
var imageQueue = Queue('image transcoding', 6379, '127.0.0.1');
var pdfQueue = Queue('pdf transcoding', 6379, '127.0.0.1');
var videoQueue = new Queue('video transcoding', 'redis://127.0.0.1:6379');
var audioQueue = new Queue('audio transcoding', {redis: {port: 6379, host: '127.0.0.1'}}); // Specify Redis connection using object
var imageQueue = new Queue('image transcoding');
var pdfQueue = new Queue('pdf transcoding');

@@ -108,3 +106,3 @@ videoQueue.process(function(job, done){

// If the job throws an unhandled exception it is also handled correctly
throw (Error('some unexpected error'));
throw new Error('some unexpected error');
});

@@ -126,3 +124,3 @@

// If the job throws an unhandled exception it is also handled correctly
throw (Error('some unexpected error'));
throw new Error('some unexpected error');
});

@@ -144,3 +142,3 @@

// If the job throws an unhandled exception it is also handled correctly
throw (Error('some unexpected error'));
throw new Error('some unexpected error');
});

@@ -241,4 +239,4 @@

```javascript
var userJohn = Queue('john');
var userLisa = Queue('lisa');
var userJohn = new Queue('john');
var userLisa = new Queue('lisa');
.

@@ -258,3 +256,3 @@ .

var numWorkers = 8;
var queue = Queue("test concurrent queue", 6379, '127.0.0.1');
var queue = new Queue("test concurrent queue");

@@ -287,4 +285,12 @@ if(cluster.isMaster){

The queue aims for "at most once" working strategy. When a worker is processing a job, it will keep the job locked until the work is done. However, it is important that the worker does not lock the event loop too long, otherwise other workers could pick the job believing that the worker processing it has been stalled.
The queue aims for "at most once" working strategy. When a worker is processing a job it will keep the job "locked" so other workers can't process it.
It's important to understand how locking works to prevent your jobs from losing their lock - becoming _stalled_ - and being restarted as a result. Locking is implemented internally by creating a lock for `lockDuration` on interval `lockRenewTime` (which is usually half `lockDuration`). If `lockDuration` elapses before the lock can be renewed, the job will be considered stalled and is automatically restarted; it will be __double processed__. This can happen when:
1. The Node process running your job processor unexpectedly terminates.
2. Your job processor was too CPU-intensive and stalled the Node event loop, and as a result, Bull couldn't renew the job lock (see #488 for how we might better detect this). You can fix this by breaking your job processor into smaller parts so that no single part can block the Node event loop. Alternatively, you can pass a larger value for the `lockDuration` setting (with the tradeoff being that it will take longer to recognize a real stalled job).
As such, you should always listen for the `stalled` event and log this to your error monitoring system, as this means your jobs are likely getting double-processed.
As a safeguard so problematic jobs won't get restarted indefinitely (e.g. if the job processor aways crashes its Node process), jobs will be recovered from a stalled state a maximum of `maxStalledCount` times (default: `1`).
Reusing Redis connections

@@ -303,15 +309,11 @@ -------------------------

var opts = {
redis: {
opts: {
createClient: function(type){
switch(type){
case 'client':
return client;
case 'subscriber':
return subscriber;
default:
return new redis();
}
createClient: function(type, opts){
switch(type){
case 'client':
return client;
case 'subscriber':
return subscriber;
default:
return new redis(opts);
}
}
}

@@ -339,4 +341,4 @@ }

var sendQueue = Queue("Server B");
var receiveQueue = Queue("Server A");
var sendQueue = new Queue("Server B");
var receiveQueue = new Queue("Server A");

@@ -355,4 +357,4 @@ receiveQueue.process(function(job, done){

var sendQueue = Queue("Server A");
var receiveQueue = Queue("Server B");
var sendQueue = new Queue("Server A");
var receiveQueue = new Queue("Server B");

@@ -406,8 +408,5 @@ receiveQueue.process(function(job, done){

```ts
Queue(queueName: string, redisPort: number, redisHost: string, redisOpts?: RedisOpts): Queue
```typescript
new Queue(queueName: string, redisConnectionString?: string, opts: QueueOptions): Queue
```
```ts
Queue(queueName: string, redisConnectionString: string, redisOpts? RedisOpts): Queue
```

@@ -418,21 +417,47 @@ This is the Queue constructor. It creates a new Queue that is persisted in

If no connection string or options passed, the queue will use ioredis default connection
settings.
__Arguments__
```javascript
queueName {String} A unique name for this Queue.
redisPort {Number} A port where redis server is running.
redisHost {String} A host specified as IP or domain where redis is running.
redisOptions {Object} Options to pass to the redis client. https://github.com/luin/ioredis/blob/master/API.md#new-redisport-host-options
```typescript
queueName: string, // A unique name for this Queue.
redisConnectionString?: string, // string A connection string containing the redis server host, port and (optional) authentication.
opts?: QueueOptions, // Options to pass to the redis client. https://github.com/luin/ioredis/blob/master/API.md#new-redisport-host-options
```
Alternatively, it's possible to pass a connection string to create a new queue.
```typescript
interface QueueOptions {
prefix?: string = 'bull',
redis : RedisOpts, // ioredis defaults
createClient?: (type: enum('client', 'subscriber'), redisOpts?: RedisOpts) => redisClient,
__Arguments__
```javascript
queueName {String} A unique name for this Queue.
redisConnectionString {String} A connection string containing the redis server host, port and (optional) authentication.
redisOptions {Object} Options to pass to the redis client. https://github.com/luin/ioredis/blob/master/API.md#new-redisport-host-options
// Advanced settings (see below)
settings?: QueueSettings {
lockDuration?: number = 30000,
lockRenewTime?: number = lockDuration / 2,
stalledInterval?: number = 30000,
maxStalledCount?: number = 1,
guardInterval?: number = 5000,
retryProcessDelay?: number = 5000,
}
}
```
__Advanced Settings__
__Warning:__ Do not override these advanced settings unless you understand the internals of the queue.
`lockDuration`: Time in milliseconds to acquire the job lock. Set this to a higher value if you find that your jobs are being stalled because your job processor is CPU-intensive and blocking the event loop (see note below about stalled jobs). Set this to a lower value if your jobs are extremely time-sensitive and it might be OK if they get double-processed (due to them being falsly considered stalled).
`lockRenewTime`: Interval in milliseconds on which to acquire the job lock. It is set to `lockDuration / 2` by default to give enough buffer to renew the lock each time before the job lock expires. It should never be set to a value larger than `lockDuration`. Set this to a lower value if you're finding that jobs are becoming stalled due to a CPU-intensive job processor function. Generally you shouldn't change this though.
`stalledInterval`: Interval in milliseconds on which each worker will check for stalled jobs. See note below about stalled jobs. Set this to a lower value if your jobs are extremely time-sensitive. Set this to a higher value if your Redis CPU usage is high as this check can be expensive. Note that because each worker runs this on its own interval and checks the entire queue, the stalled job check actually runs on the queue much more frequently than this value would imply.
`maxStalledCount`: The maximum number of times a stalled job can be restarted before it will be permamently failed with the error `job stalled more than allowable limit`. This is set to a default of `1` with the assumption that stalled jobs should be very rare (i.e. only due to process crashes) and you want to be on the safer side of not double-processing jobs. Set this higher if stalled jobs are common (e.g. processes crash a lot) and it's generally OK to double-process jobs.
`guardInterval`: Interval in milliseconds on which the delayed job watchdog will run. This watchdog is only in place for unstable Redis connections which can caused delayed jobs to not be processed. Set to a lower value if your Redis connection is unstable and delayed jobs aren't being processed in time.
`retryProcessDelay`: Time in milliseconds in which to wait before trying to process jobs, in case of a Redis error. Set to a lower value if your Redis connection is unstable.
---------------------------------------

@@ -632,3 +657,3 @@

var Queue = require('bull');
var queue = Queue('example');
var queue = new Queue('example');

@@ -849,3 +874,3 @@ var after100 = _.after(100, function () {

Warning!!: Priority queue use 5 times more redis connections than a normal queue.
Warning!!: Priority queue use 5 times more Redis connections than a normal queue.

@@ -852,0 +877,0 @@

@@ -27,5 +27,5 @@ /*eslint-env node */

}).catch(function (err) {
console.err(err);
console.error(err);
// process.exit(-1);
});
});

@@ -57,3 +57,3 @@ /*eslint-env node */

workers.push(worker);
console.log('Worker spawned: #', worker.id);
// console.log('Worker spawned: #', worker.id);
}

@@ -60,0 +60,0 @@ });

@@ -33,3 +33,3 @@ /*eslint-env node */

// Simulate disconnect
queue.on('ready', function(){
queue.isReady().then(function(){
queue.client.stream.end();

@@ -93,3 +93,3 @@ queue.client.emit('error', new Error('ECONNRESET'));

queue.on('ready', function(){
queue.isReady().then(function(){
queue.add({ 'foo': 'bar' });

@@ -96,0 +96,0 @@ });

@@ -21,7 +21,7 @@ /*eslint-env node */

beforeEach(function(){
queue = new Queue('test-' + uuid(), 6379, '127.0.0.1');
queue = new Queue('test-' + uuid(), {redis: {port: 6379, host: '127.0.0.1'}});
});
afterEach(function(){
this.timeout(queue.STALLED_JOB_CHECK_INTERVAL * (1 + queue.MAX_STALLED_JOB_COUNT));
this.timeout(queue.settings.stalledInterval * (1 + queue.settings.maxStalledCount));
return queue.close();

@@ -123,3 +123,3 @@ });

expect(isActive).to.be(true);
return job.releaseLock()
return job.releaseLock();
}).then(function(){

@@ -183,5 +183,7 @@ return job.remove();

queue.once('failed', function (job) {
queue.once('waiting', function (job2) {
expect(job2.data.foo).to.be.equal('bar');
cb();
queue.once('waiting', function (jobId2) {
Job.fromId(queue, jobId2).then(function(job2){
expect(job2.data.foo).to.be.equal('bar');
cb();
});
});

@@ -224,12 +226,2 @@ job.retry();

it('can renew a previously taken lock', function(){
return job.takeLock().then(function(lockTaken){
expect(lockTaken).to.be.truthy;
}).then(function(){
return job.renewLock().then(function(lockRenewed){
expect(lockRenewed).to.be.truthy;;
});
});
});
it('can release a lock', function(){

@@ -468,3 +460,3 @@ return job.takeLock().then(function(lockTaken){

it('should reject when the job has been completed', function(done){
it('should reject when the job has been failed', function(done){
queue.process(function () {

@@ -492,3 +484,3 @@ return Promise.delay(500).then(function(){

return job.finished();
})
});
}).then(function(){

@@ -495,0 +487,0 @@ done();

@@ -7,2 +7,3 @@ /*eslint-env node */

var STD_QUEUE_NAME = 'test queue';
var _ = require('lodash');

@@ -16,4 +17,5 @@ var queues = [];

function buildQueue(name) {
var queue = new Queue(name || STD_QUEUE_NAME, 6379, '127.0.0.1');
function buildQueue(name, options) {
options = _.extend({redis: {port: 6379, host: '127.0.0.1'}}, options);
var queue = new Queue(name || STD_QUEUE_NAME, options);
queues.push(queue);

@@ -23,9 +25,5 @@ return queue;

function newQueue(name){
var queue = buildQueue(name);
return new Promise(function(resolve){
queue.on('ready', function(){
resolve(queue);
});
});
function newQueue(name, opts){
var queue = buildQueue(name, opts);
return queue.isReady();
}

@@ -32,0 +30,0 @@

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is too big to display

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc