Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

bull

Package Overview
Dependencies
Maintainers
1
Versions
198
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

bull - npm Package Compare versions

Comparing version 1.1.3 to 2.0.0

8

CHANGELOG.md

@@ -0,1 +1,9 @@

v.2.0.0
=======
- Changed redis module to ioredis fixing many issues along the way, see changes.
[Changes](https://github.com/OptimalBits/bull/compare/v1.1.3...v2.0.0)
v.1.1.3

@@ -2,0 +10,0 @@ =======

73

lib/job.js

@@ -8,3 +8,2 @@ /*eslint-env node */

var debuglog = require('debuglog')('bull');
var uuid = require('node-uuid');

@@ -63,7 +62,7 @@ /**

}
return queue.client.hgetallAsync(queue.toKey(jobId)).then(function(jobData){
if(jobData){
return queue.client.hgetall(queue.toKey(jobId)).then(function(jobData){
if(!_.isEmpty(jobData)){
return Job.fromData(queue, jobId, jobData);
}else{
return jobData;
return null;
}

@@ -90,3 +89,3 @@ });

var _this = this;
return this.queue.client.hsetAsync(this.queue.toKey(this.jobId), 'progress', progress).then(function(){
return this.queue.client.hset(this.queue.toKey(this.jobId), 'progress', progress).then(function(){
_this.queue.distEmit('progress', _this.toJSON(), progress);

@@ -103,3 +102,3 @@ });

Job.prototype.toJSON = function(){
var opts = this.opts || {};
var opts = _.extend({}, this.opts || {});
opts.jobId = this.jobId;

@@ -130,5 +129,8 @@ return {

*/
Job.prototype.takeLock = function(token, renew, ensureActive){
return scripts.takeLock(this.queue, this, token, renew, ensureActive).then(function(res){
return res === 1; // Indicates successful lock.
Job.prototype.takeLock = function(renew, ensureActive){
var _this = this;
return scripts.takeLock(this.queue, this, renew, ensureActive)
.then(function(lock) {
if (lock) _this.lock = lock;
return lock || false;
});

@@ -140,4 +142,4 @@ };

*/
Job.prototype.renewLock = function(token){
return this.takeLock(token, true /* Renew */);
Job.prototype.renewLock = function(){
return this.takeLock(true /* Renew */);
};

@@ -148,4 +150,6 @@

*/
Job.prototype.releaseLock = function(token){
return scripts.releaseLock(this, token);
Job.prototype.releaseLock = function(){
var _this = this;
return scripts.releaseLock(this)
.then(function() { _this.lock = null; });
};

@@ -165,8 +169,8 @@

Job.prototype.moveToCompleted = function(returnValue, token){
Job.prototype.moveToCompleted = function(returnValue){
this.returnvalue = returnValue || 0;
return scripts.moveToCompleted(this, token || 0, this.opts.removeOnComplete);
return scripts.moveToCompleted(this || 0, this.opts.removeOnComplete);
};
Job.prototype.move = function(src, target, token, returnValue){
Job.prototype.move = function(src, target, returnValue){
if(target === 'completed'){

@@ -178,3 +182,3 @@ this.returnvalue = returnValue || 0;

}
return scripts.move(this, token || 0, src, target);
return scripts.move(this || 0, src, target);
}

@@ -223,3 +227,3 @@

return queue.client.evalAsync(
return queue.client.eval(
script,

@@ -269,3 +273,3 @@ keys.length,

return this.queue.client
.zrankAsync(this.queue.toKey('delayed'), this.jobId).then(function(rank) {
.zrank(this.queue.toKey('delayed'), this.jobId).then(function(rank) {
return rank !== null;

@@ -319,9 +323,7 @@ });

*/
Job.prototype.remove = function(token){
Job.prototype.remove = function(){
var queue = this.queue;
var job = this;
var token = token || uuid();
return job.takeLock(token).then(function(lock) {
return job.takeLock().then(function(lock) {
if (!lock) {

@@ -335,3 +337,3 @@ throw new Error('Could not get lock for job: ' + job.jobId + '. Cannot remove job.');

.finally(function () {
return job.releaseLock(token);
return job.releaseLock();
});

@@ -415,3 +417,3 @@ });

return this.queue.client
.sismemberAsync(this.queue.toKey(list), this.jobId).then(function(isMember){
.sismember(this.queue.toKey(list), this.jobId).then(function(isMember){
return isMember === 1;

@@ -422,18 +424,3 @@ });

Job.prototype._isInList = function(list) {
var script = [
'local function item_in_list (list, item)',
' for _, v in pairs(list) do',
' if v == item then',
' return 1',
' end',
' end',
' return nil',
'end',
'local items = redis.call("LRANGE", KEYS[1], 0, -1)',
'return item_in_list(items, ARGV[1])'
].join('\n');
return this.queue.client.evalAsync(script, 1, this.queue.toKey(list), this.jobId).then(function(result){
return result === 1;
});
return scripts.isJobInList(this.queue.client, this.queue.toKey(list), this.jobId);
};

@@ -484,3 +471,3 @@

return queue.client.evalAsync(
return queue.client.eval(
script,

@@ -514,3 +501,3 @@ keys.length,

return this.queue.client.hmsetAsync(this.queue.toKey(this.jobId), params);
return this.queue.client.hmset(this.queue.toKey(this.jobId), params);
};

@@ -517,0 +504,0 @@

/*eslint-env node */
'use strict';
var redis = require('redis');
var redis = require('ioredis');
var Disturbed = require('disturbed');

@@ -14,8 +14,5 @@ var util = require('util');

var Promise = require('bluebird');
var uuid = require('node-uuid');
var semver = require('semver');
var debuglog = require('debuglog')('bull');
Promise.promisifyAll(redis.RedisClient.prototype);
Promise.promisifyAll(redis.Multi.prototype);

@@ -62,2 +59,6 @@ /**

var REDLOCK_DRIFT_FACTOR = 0.01;
var REDLOCK_RETRY_COUNT = 0;
var REDLOCK_RETRY_DELAY = 200;
var Queue = function Queue(name, redisPort, redisHost, redisOptions){

@@ -74,3 +75,3 @@ if(!(this instanceof Queue)){

redisOptions = redisOpts.opts || {};
redisOptions.db = redisOpts.DB;
redisOptions.db = redisOpts.DB || redisOpts.DB;
} else if(parseInt(redisPort) == redisPort) {

@@ -96,3 +97,2 @@ redisPort = parseInt(redisPort);

redisOptions = redisOptions || {};
var redisDB = redisOptions.db || 0;

@@ -131,2 +131,16 @@ function createClient() {

//
// Keep track of cluster clients for redlock
//
this.clients = [this.client];
if (redisOptions.clients) {
this.clients.push.apply(this.clients, redisOptions.clients);
}
this.redlock = {
driftFactor: REDLOCK_DRIFT_FACTOR,
retryCount: REDLOCK_RETRY_COUNT,
retryDelay: REDLOCK_RETRY_DELAY
};
_.extend(this.redlock, redisOptions.redlock || {});
//
// Create blocking client (used to wait for jobs)

@@ -144,3 +158,2 @@ //

this.token = uuid();
this.LOCK_RENEW_TIME = LOCK_RENEW_TIME;

@@ -161,10 +174,13 @@ this.STALLED_JOB_CHECK_INTERVAL = STALLED_JOB_CHECK_INTERVAL;

// emit ready when redis connections ready
this._initializing = Promise.join(
this.client.selectAsync(redisDB),
this.bclient.selectAsync(redisDB),
this.eclient.selectAsync(redisDB)
).then(function(){
var initializers = [this.client, this.bclient, this.eclient].map(function (client) {
return new Promise(function(resolve) {
client.once('ready', resolve);
});
});
this._initializing = Promise.all(initializers)
.then(function(){
return Promise.join(
_this.eclient.subscribeAsync(_this.toKey('delayed')),
_this.eclient.subscribeAsync(_this.toKey('paused'))
_this.eclient.subscribe(_this.toKey('delayed')),
_this.eclient.subscribe(_this.toKey('paused'))
);

@@ -196,3 +212,3 @@ }).then(function(){

if(eventName !== 'cleaned'){
if(eventName !== 'cleaned' && eventName !== 'error'){
args[0] = Job.fromJSON(_this, args[0]);

@@ -268,3 +284,5 @@ }

var args = Array.prototype.slice.call(arguments);
Disturbed.prototype.on.apply(this, args);
var promise = Disturbed.prototype.on.apply(this, args);
var _this = this;
promise.catch(function(err){ _this.emit('error', err); });
return this;

@@ -297,4 +315,4 @@ };

return Promise.join(
_this.client.quitAsync(),
_this.eclient.quitAsync()
_this.client.quit(),
_this.eclient.quit()
).then(endClients, endClients);

@@ -400,4 +418,4 @@ };

return multi.execAsync().then(function(res){
return Math.max(res[0], res[1]) + res[2];
return multi.exec().then(function(res){
return Math.max(res[0][1], res[1][1]) + res[2][1];
});

@@ -430,3 +448,5 @@ };

return multi.execAsync().spread(function(waiting, paused){
return multi.exec().spread(function(waiting, paused){
waiting = waiting[1];
paused = paused[1];
var jobKeys = (paused.concat(waiting)).map(_this.toKey, _this);

@@ -438,3 +458,3 @@

multi.del.apply(multi, jobKeys);
return multi.execAsync();
return multi.exec();
}

@@ -511,3 +531,3 @@ });

return queue.client.evalAsync(script, keys.length, keys[0], keys[1], keys[2], keys[3], pause ? 'paused' : 'resumed');
return queue.client.eval(script, keys.length, keys[0], keys[1], keys[2], keys[3], pause ? 'paused' : 'resumed');
}

@@ -525,7 +545,3 @@

if (_this.STALLED_JOB_CHECK_INTERVAL > 0){
clearInterval(_this.moveUnlockedJobsToWaitInterval);
_this.moveUnlockedJobsToWaitInterval =
setInterval(_this.moveUnlockedJobsToWait, _this.STALLED_JOB_CHECK_INTERVAL);
}
_this.startMoveUnlockedJobsToWait();

@@ -578,22 +594,26 @@ return Promise.all(promises);

if(this.closing){
return this.closing;
} else{
return scripts.moveUnlockedJobsToWait(this).then(function(responses){
var handleFailedJobs = responses[0].map(function(jobId){
return _this.getJobFromId(jobId).then(function(job){
_this.distEmit('failed', job.toJSON(), new Error('job stalled more than allowable limit'));
return null;
});
return scripts.moveUnlockedJobsToWait(this).then(function(responses){
var handleFailedJobs = responses[0].map(function(jobId){
return _this.getJobFromId(jobId).then(function(job){
_this.distEmit('failed', job.toJSON(), new Error('job stalled more than allowable limit'));
return null;
});
var handleStalledJobs = responses[1].map(function(jobId){
return _this.getJobFromId(jobId).then(function(job){
_this.distEmit('stalled', job.toJSON());
return null;
});
});
var handleStalledJobs = responses[1].map(function(jobId){
return _this.getJobFromId(jobId).then(function(job){
_this.distEmit('stalled', job.toJSON());
return null;
});
return Promise.all(handleFailedJobs.concat(handleStalledJobs));
}).catch(function(err){
console.error('Failed to handle unlocked job in active:', err);
});
return Promise.all(handleFailedJobs.concat(handleStalledJobs));
}).catch(function(err){
console.error('Failed to handle unlocked job in active:', err);
});
};
Queue.prototype.startMoveUnlockedJobsToWait = function() {
if (this.STALLED_JOB_CHECK_INTERVAL > 0){
clearInterval(this.moveUnlockedJobsToWaitInterval);
this.moveUnlockedJobsToWaitInterval =
setInterval(this.moveUnlockedJobsToWait, this.STALLED_JOB_CHECK_INTERVAL);
}

@@ -640,7 +660,4 @@ };

var lockRenewer = function(){
// The first call to lock the job should ensure that the job is in the 'active' state,
// because it might have gotten picked up already by another processor. We don't need
// to do this on subsequent calls.
return job.takeLock(_this.token, renew, !renew).then(function(locked){
if(locked){
return job.takeLock(renew, true).then(function(lock){
if(lock){
renew = true;

@@ -651,3 +668,3 @@ lockRenewId = _this.timers.set('lockRenewer', _this.LOCK_RENEW_TIME / 2, lockRenewer);

// handler know and cancel the timer?
return locked;
return lock;
}, function(err){

@@ -674,6 +691,5 @@ console.error('Error renewing lock ' + err);

return job.moveToCompleted(data, _this.token)
return job.moveToCompleted(data)
.then(function(){
_this.distEmit('completed', job.toJSON(), data);
return null; // Fixes #253
return _this.distEmit('completed', job.toJSON(), data);
});

@@ -686,6 +702,5 @@ }

return job.moveToFailed(err)
.then(job.releaseLock.bind(job, _this.token))
.then(job.releaseLock.bind(job))
.then(function(){
_this.distEmit('failed', job.toJSON(), error);
return null; // Fixes #253
return _this.distEmit('failed', job.toJSON(), error);
});

@@ -719,9 +734,11 @@ }

Queue.prototype.getNextJob = function(opts){
return this.moveJob('wait', 'active', opts).then(this.getJobFromId);
if(!this.closing){
return this.moveJob('wait', 'active', opts).then(this.getJobFromId);
}else{
return Promise.reject();
}
};
Queue.prototype.multi = function(){
var multi = this.client.multi();
multi.execAsync = Promise.promisify(multi.exec);
return multi;
return this.client.multi();
};

@@ -738,3 +755,3 @@

if(!this.closing){
return this.bclient.rpoplpushAsync(this.toKey(src), this.toKey(dst));
return this.bclient.rpoplpush(this.toKey(src), this.toKey(dst));
}else{

@@ -744,3 +761,3 @@ return Promise.reject();

}else{
return this.bclient.brpoplpushAsync(
return this.bclient.brpoplpush(
this.toKey(src),

@@ -795,4 +812,4 @@ this.toKey(dst),

return multi.execAsync().then(function(res){
return _.reduce(res, function(total, n) {
return multi.exec().then(function(res){
return _.reduce(res[0], function(total, n) {
return total + n;

@@ -804,23 +821,23 @@ }) || 0;

Queue.prototype.getCompletedCount = function() {
return this.client.scardAsync(this.toKey('completed'));
return this.client.scard(this.toKey('completed'));
};
Queue.prototype.getFailedCount = function() {
return this.client.scardAsync(this.toKey('failed'));
return this.client.scard(this.toKey('failed'));
};
Queue.prototype.getDelayedCount = function() {
return this.client.zcardAsync(this.toKey('delayed'));
return this.client.zcard(this.toKey('delayed'));
};
Queue.prototype.getActiveCount = function() {
return this.client.llenAsync(this.toKey('active'));
return this.client.llen(this.toKey('active'));
};
Queue.prototype.getWaitingCount = function() {
return this.client.llenAsync(this.toKey('wait'));
return this.client.llen(this.toKey('wait'));
};
Queue.prototype.getPausedCount = function() {
return this.client.llenAsync(this.toKey('paused'));
return this.client.llen(this.toKey('paused'));
};

@@ -862,6 +879,6 @@

case 'LIST':
jobs = this.client.lrangeAsync(key, start, end);
jobs = this.client.lrange(key, start, end);
break;
case 'SET':
jobs = this.client.smembersAsync(key).then(function(jobIds) {
jobs = this.client.smembers(key).then(function(jobIds) {
// Can't set a range for smembers. So do the slice programatically instead.

@@ -877,3 +894,3 @@ // Note that redis ranges are inclusive, so handling for javascript accordingly

case 'ZSET':
jobs = this.client.zrangeAsync(key, start, end);
jobs = this.client.zrange(key, start, end);
break;

@@ -952,9 +969,14 @@ }

resolver = _.after(count, function(){
_this.removeListener('stalled', resolver);
_this.removeListener('completed', resolver);
_this.removeListener('failed', resolver);
clearInterval(_this.moveUnlockedJobsToWaitInterval);
resolve();
});
_this.on('stalled', resolver);
_this.on('completed', resolver);
_this.on('failed', resolver);
_this.startMoveUnlockedJobsToWait();
}

@@ -969,3 +991,3 @@ }, reject);

var getRedisVersion = function getRedisVersion(client){
return client.infoAsync().then(function(doc){
return client.info().then(function(doc){
var prefix = 'redis_version:';

@@ -972,0 +994,0 @@ var lines = doc.split('\r\n');

@@ -13,2 +13,3 @@ /**

var debuglog = require('debuglog')('bull');
var Redlock = require('redlock');

@@ -29,3 +30,3 @@ var cache = {};

args.unshift(sha);
return client.evalshaAsync.apply(client, args).catch(function(err){
return client.evalsha.apply(client, args).catch(function(err){
debuglog(err, script, err.stack);

@@ -47,3 +48,3 @@ if(err.code === 'NOSCRIPT'){

function cacheScript(client, hash, script){
cache[hash] = client.scriptAsync('LOAD', script);
cache[hash] = client.script('LOAD', script);
return cache[hash];

@@ -53,4 +54,7 @@ }

var scripts = {
isJobInList: function(client, listKey, jobId){
var script = [
_isJobInList: function(keyVar, argVar, operator) {
keyVar = keyVar || 'KEYS[1]';
argVar = argVar || 'ARGV[1]';
operator = operator || 'return';
return [
'local function item_in_list (list, item)',

@@ -64,7 +68,8 @@ ' for _, v in pairs(list) do',

'end',
'local items = redis.call("LRANGE", KEYS[1], 0, -1)',
'return item_in_list(items, ARGV[1])'
['local items = redis.call("LRANGE",', keyVar, ' , 0, -1)'].join(''),
[operator, ' item_in_list(items, ', argVar, ')'].join('')
].join('\n');
return execScript(client, 'isJobInList', script, 1, listKey, jobId).then(function(result){
},
isJobInList: function(client, listKey, jobId){
return execScript(client, 'isJobInList', this._isJobInList(), 1, listKey, jobId).then(function(result){
return result === 1;

@@ -144,3 +149,3 @@ });

// of all the specialized functions moveToComplete, etc.
move: function(job, token, src, target){
move: function(job, src, target){
// TODO: Depending on the source we should use LREM, SREM or ZREM.

@@ -167,11 +172,5 @@ // TODO: Depending on the target we should use LPUSH, SADD, etc.

'if redis.call("EXISTS", KEYS[3]) == 1 then', // Make sure job exists
' local lock = redis.call("GET", KEYS[4])',
' if (not lock) or (lock == ARGV[3]) then', // Makes sure we own the lock
' redis.call("LREM", KEYS[1], -1, ARGV[1])',
' redis.call("LREM", KEYS[1], -1, ARGV[1])',
target ? moveJob : deleteJob,
' redis.call("DEL", KEYS[4])',
' return 0',
' else',
' return -2',
' end',
' return 0',
'else',

@@ -192,7 +191,17 @@ ' return -1',

job.jobId,
job.returnvalue ? JSON.stringify(job.returnvalue) : '',
token
job.returnvalue ? JSON.stringify(job.returnvalue) : ''
];
return execScript.apply(scripts, args).then(function(result){
var returnLockOrErrorCode = function(lock) {
return lock ? execScript.apply(scripts, args) : -2;
};
var throwUnexpectedErrors = function(err) {
if (!(err instanceof Redlock.LockError)) {
throw err;
}
};
return job.takeLock(!!job.lock)
.then(returnLockOrErrorCode, throwUnexpectedErrors)
.then(function(result){
switch (result){

@@ -207,8 +216,9 @@ case -1:

throw new Error('Cannot get lock for job ' + job.jobId + ' when trying to move from ' + src);
default:
return job.releaseLock()
}
});
},
moveToCompleted: function(job, token, removeOnComplete){
return scripts.move(job, token, 'active', removeOnComplete ? void 0 : 'completed');
moveToCompleted: function(job, removeOnComplete){
return scripts.move(job, 'active', removeOnComplete ? void 0 : 'completed');
},

@@ -306,89 +316,49 @@

* @param {Job} job The job
* @param {Boolean=false} renew Whether to renew to lock, meaning it will assume the job
* @param {Boolean=false} renew Whether to renew to lock, meaning it will assume the job
* is already locked and just reset the lock expiration.
* @param {Boolean=false} ensureActive Ensures that the job is in the 'active' state.
*/
takeLock: function(queue, job, token, renew, ensureActive){
// Ensures that the lock doesn't exist, or if it does, that we own it.
var ensureOwnershipCall = [
'local prevLock = redis.call("GET", KEYS[1])',
'if (prevLock and prevLock ~= ARGV[1]) then',
' return 0',
'end'
].join('\n');
takeLock: function(queue, job, renew, ensureActive){
var lock = job.lock;
if (renew && !lock) {
throw new Error('Unable to renew nonexisting lock');
}
if (renew) {
return lock.extend(queue.LOCK_RENEW_TIME);
}
if (lock) {
return Promise.resolve(null);
}
// Ensures that the lock in the 'active' state.
var ensureActiveCall = [
// Note that while this is inefficient to run a O(n) traversal of the 'active' queue,
// it's highly likely that the job is within the first few elements of the active
// list at the time this call is used.
'local activeJobs = redis.call("LRANGE", KEYS[3], 0, -1)',
'local found = false',
'for _, job in ipairs(activeJobs) do',
' if(job == ARGV[3]) then',
' found = true',
' break',
' end',
'end',
'if (found == false) then',
' return 0',
'end'
].join('\n');
var lockCall;
if (renew){
lockCall = 'redis.call("SET", KEYS[1], ARGV[1], "PX", ARGV[2])';
var redlock;
if (ensureActive) {
var keyVar = ['"', job.queue.toKey('active'), '"'].join('');
var argVar = ['"', job.jobId, '"'].join('');
var isJobInList = this._isJobInList(keyVar, argVar, 'if');
var lockAcquired = ['and redis.call("HSET", "', queue.toKey(job.jobId), '", "lockAcquired", "1")'].join('');
var success = 'then return 1 else return 0 end';
var opts = {
lockScript: function(lockScript) {
return [
isJobInList,
lockScript.replace('return', 'and'),
lockAcquired,
success
].join('\n');
}
};
redlock = new Redlock(queue.clients, _.extend(opts, queue.redlock));
} else {
lockCall = 'redis.call("SET", KEYS[1], ARGV[1], "PX", ARGV[2], "NX")';
redlock = new Redlock(queue.clients, queue.redlock);
}
var script = [
(renew ? ensureOwnershipCall : ''),
(ensureActive ? ensureActiveCall : ''),
'if(' + lockCall + ') then',
// Mark the job as having been locked at least once. Used to determine if the job was stalled.
' redis.call("HSET", KEYS[2], "lockAcquired", "1")',
' return 1',
'else',
' return 0',
'end'
].join('\n');
var args = [
queue.client,
'takeLock' + (renew ? 'Renew' : '') + (ensureActive ? 'EnsureActive' : ''),
script,
3,
job.lockKey(),
queue.toKey(job.jobId),
queue.toKey('active'),
token,
queue.LOCK_RENEW_TIME,
job.jobId
];
return execScript.apply(scripts, args);
return redlock.lock(job.lockKey(), queue.LOCK_RENEW_TIME);
},
releaseLock: function(job, token){
var script = [
'if redis.call("get", KEYS[1]) == ARGV[1]',
'then',
' return redis.call("del", KEYS[1])',
'else',
' return 0',
'end'].join('\n');
var args = [
job.queue.client,
'releaseLock',
script,
1,
job.lockKey(),
token
];
return execScript.apply(scripts, args).then(function(result){
return result === 1;
});
releaseLock: function(job){
var lock = job.lock;
if (!lock) {
throw new Error('Unable to release nonexisting lock');
}
return lock.unlock()
},

@@ -661,3 +631,3 @@ /**

return multi.execAsync().spread(function(waiting, paused){
return multi.exec().spread(function(waiting, paused){
var jobKeys = (paused.concat(waiting)).map(_this.toKey, _this);

@@ -669,3 +639,3 @@

multi.del.apply(multi, jobKeys);
return multi.execAsync();
return multi.exec();
}

@@ -672,0 +642,0 @@ });

@@ -6,3 +6,3 @@ /*eslint-env node */

var _ = require('lodash');
var uuid = require('node-uuid');
var uuid = require('uuid');

@@ -9,0 +9,0 @@ /**

{
"name": "bull",
"version": "1.1.3",
"version": "2.0.0",
"description": "Job manager",

@@ -23,6 +23,7 @@ "main": "index.js",

"disturbed": "^1.0.6",
"lodash": "^4.16.6",
"node-uuid": "^1.4.7",
"redis": "^2.6.3",
"semver": "^5.3.0"
"ioredis": "^2.4.0",
"lodash": "^4.17.2",
"redlock": "^2.1.0",
"semver": "^5.3.0",
"uuid": "^3.0.0"
},

@@ -29,0 +30,0 @@ "devDependencies": {

@@ -9,3 +9,3 @@ Bull Job Manager

<img src="https://image.freepik.com/free-icon/strong-bull-side-view_318-52710.jpg", width="200" />
<img src="https://image.freepik.com/free-icon/strong-bull-side-view_318-52710.jpg" width="200" />

@@ -12,0 +12,0 @@ The fastest, more reliable redis based queue for nodejs.

@@ -9,4 +9,3 @@ /*eslint-env node */

var expect = require('expect.js');
var Promise = require('bluebird');
var redis = require('redis');
var redis = require('ioredis');

@@ -24,4 +23,3 @@ var STD_QUEUE_NAME = 'cluster test queue';

var client = redis.createClient(6379, '127.0.0.1', {});
client = Promise.promisifyAll(client);
client.selectAsync(0);
client.select(0);

@@ -33,3 +31,3 @@ var script = [

return queue.client.evalAsync(
return queue.client.eval(
script,

@@ -51,14 +49,17 @@ 0,

var workers = [];
var worker;
var _i = 0;
for(_i; _i < os.cpus().length - 1; _i++) {
worker = cluster.fork();
worker.on('message', workerMessageHandlerWrapper);
workers.push(worker);
console.log('Worker spawned: #', worker.id);
}
describe('Cluster', function () {
var workers = [];
before(function() {
var worker;
var _i = 0;
for(_i; _i < os.cpus().length - 1; _i++) {
worker = cluster.fork();
worker.on('message', workerMessageHandlerWrapper);
workers.push(worker);
console.log('Worker spawned: #', worker.id);
}
});
var queue;

@@ -82,2 +83,6 @@

queue.on('stalled', function(job){
jobs.splice(jobs.indexOf(job.jobId), 1);
});
workerMessageHandler = function(job) {

@@ -164,3 +169,3 @@ jobs.push(job.id);

after(function() {
_i = 0;
var _i = 0;
for(_i; _i < workers.length; _i++) {

@@ -167,0 +172,0 @@ workers[_i].kill();

@@ -7,6 +7,4 @@ /*eslint-env node */

var sinon = require('sinon');
var redis = require('redis');
var Promise = require('bluebird');
var redis = require('ioredis');
Promise.promisifyAll(redis.RedisClient.prototype);

@@ -19,3 +17,3 @@ describe('connection', function () {

var client = redis.createClient();
return client.flushdbAsync().then(function(){
return client.flushdb().then(function(){
queue = utils.buildQueue();

@@ -31,4 +29,4 @@ });

jobDone();
// We do not wait since this close is expected to fail...
queue.close();
}).then(function() {
done();

@@ -35,0 +33,0 @@ }).catch(function(err){

@@ -7,8 +7,7 @@ /*eslint-env node */

var expect = require('expect.js');
var redis = require('redis');
var redis = require('ioredis');
var Promise = require('bluebird');
var uuid = require('node-uuid');
var uuid = require('uuid');
var Redlock = require('redlock');
Promise.promisifyAll(redis.RedisClient.prototype);
Promise.promisifyAll(redis.Multi.prototype);

@@ -20,3 +19,3 @@ describe('Job', function(){

var client = redis.createClient();
return client.flushdbAsync();
return client.flushdb();
});

@@ -29,2 +28,3 @@

afterEach(function(){
this.timeout(queue.STALLED_JOB_CHECK_INTERVAL * (1 + queue.MAX_STALLED_JOB_COUNT));
return queue.close();

@@ -52,2 +52,6 @@ });

it('should not modify input options', function() {
expect(opts).not.to.have.property('jobId');
});
it('saves the job in redis', function () {

@@ -102,8 +106,7 @@ return Job.fromId(queue, job.jobId).then(function(storedJob){

it('fails to remove a locked job', function() {
var token = uuid();
return Job.create(queue, 1, {foo: 'bar'}).then(function(job) {
return job.takeLock(token).then(function(lock) {
expect(lock).to.be(true);
return job.takeLock().then(function(lock) {
expect(lock).to.be.a(Redlock.Lock);
}).then(function() {
return job.remove(token);
return job.remove();
}).then(function() {

@@ -201,7 +204,7 @@ throw new Error('Should not be able to remove a locked job');

it('can take a lock', function(){
return job.takeLock('423').then(function(lockTaken){
expect(lockTaken).to.be(true);
return job.takeLock().then(function(lockTaken){
expect(lockTaken).to.be.a(Redlock.Lock);
}).then(function(){
return job.releaseLock('321').then(function(lockReleased){
expect(lockReleased).to.be(false);
return job.releaseLock().then(function(lockReleased){
expect(lockReleased).to.not.exist;
});

@@ -212,6 +215,6 @@ });

it('cannot take an already taken lock', function(){
return job.takeLock('1234').then(function(lockTaken){
expect(lockTaken).to.be(true);
return job.takeLock().then(function(lockTaken){
expect(lockTaken).to.be.a(Redlock.Lock);
}).then(function(){
return job.takeLock('1234').then(function(lockTaken){
return job.takeLock().then(function(lockTaken){
expect(lockTaken).to.be(false);

@@ -223,7 +226,7 @@ });

it('can renew a previously taken lock', function(){
return job.takeLock('1235').then(function(lockTaken){
expect(lockTaken).to.be(true);
return job.takeLock().then(function(lockTaken){
expect(lockTaken).to.be.a(Redlock.Lock);
}).then(function(){
return job.renewLock('1235').then(function(lockRenewed){
expect(lockRenewed).to.be(true);
return job.renewLock().then(function(lockRenewed){
expect(lockRenewed).to.be.a(Redlock.Lock);
});

@@ -234,12 +237,8 @@ });

it('can release a lock', function(){
return job.takeLock('1237').then(function(lockTaken){
expect(lockTaken).to.be(true);
return job.takeLock().then(function(lockTaken){
expect(lockTaken).to.be.a(Redlock.Lock);
}).then(function(){
return job.releaseLock('321').then(function(lockReleased){
expect(lockReleased).to.be(false);
return job.releaseLock().then(function(lockReleased){
expect(lockReleased).to.not.exist;
});
}).then(function(){
return job.releaseLock('1237').then(function(lockReleased){
expect(lockReleased).to.be(true);
});
});

@@ -393,3 +392,3 @@ });

var client = Promise.promisifyAll(redis.createClient());
var client = redis.createClient();
return Job.create(queue, {foo: 'baz'}).then(function(job) {

@@ -409,3 +408,3 @@ return job.isStuck().then(function(isStuck) {

expect(state).to.be('completed');
return client.sremAsync(queue.toKey('completed'), job.jobId);
return client.srem(queue.toKey('completed'), job.jobId);
}).then(function(){

@@ -420,3 +419,3 @@ return job.moveToDelayed(Date.now() + 10000);

expect(state).to.be('delayed');
return client.zremAsync(queue.toKey('delayed'), job.jobId);
return client.zrem(queue.toKey('delayed'), job.jobId);
}).then(function() {

@@ -431,3 +430,3 @@ return job.moveToFailed(new Error('test'));

expect(state).to.be('failed');
return client.sremAsync(queue.toKey('failed'), job.jobId);
return client.srem(queue.toKey('failed'), job.jobId);
}).then(function(res) {

@@ -438,5 +437,5 @@ expect(res).to.be(1);

expect(state).to.be('stuck');
return client.rpopAsync(queue.toKey('wait'));
return client.rpop(queue.toKey('wait'));
}).then(function(){
return client.lpushAsync(queue.toKey('paused'), job.jobId);
return client.lpush(queue.toKey('paused'), job.jobId);
}).then(function() {

@@ -449,5 +448,5 @@ return job.isPaused();

expect(state).to.be('paused');
return client.rpopAsync(queue.toKey('paused'));
return client.rpop(queue.toKey('paused'));
}).then(function() {
return client.lpushAsync(queue.toKey('wait'), job.jobId);
return client.lpush(queue.toKey('wait'), job.jobId);
}).then(function() {

@@ -454,0 +453,0 @@ return job.isWaiting();

@@ -10,4 +10,4 @@ /// <reference path='../typings/mocha/mocha.d.ts'/>

var _ = require('lodash');
var uuid = require('node-uuid');
var redis = require('redis');
var uuid = require('uuid');
var redis = require('ioredis');

@@ -31,3 +31,3 @@ var STD_QUEUE_NAME = 'test queue';

var client = redis.createClient();
return client.flushdbAsync();
return client.flushdb();
});

@@ -34,0 +34,0 @@

@@ -7,10 +7,8 @@ /*eslint-env node */

var Promise = require('bluebird');
var redis = require('redis');
var redis = require('ioredis');
var sinon = require('sinon');
var _ = require('lodash');
var uuid = require('node-uuid');
var uuid = require('uuid');
var utils = require('./utils');
Promise.promisifyAll(redis.RedisClient.prototype);
Promise.promisifyAll(redis.Multi.prototype);

@@ -36,3 +34,3 @@ /*

var client = redis.createClient();
return client.flushdbAsync();
return client.flushdb();
});

@@ -74,10 +72,10 @@

it('should resolve the promise when each client has disconnected', function () {
expect(testQueue.client.connected).to.be(true);
expect(testQueue.bclient.connected).to.be(true);
expect(testQueue.eclient.connected).to.be(true);
expect(testQueue.client.status).to.be('ready');
expect(testQueue.bclient.status).to.be('ready');
expect(testQueue.eclient.status).to.be('ready');
return testQueue.close().then(function () {
expect(testQueue.client.connected).to.be(false);
expect(testQueue.bclient.connected).to.be(false);
expect(testQueue.eclient.connected).to.be(false);
expect(testQueue.client.status).to.be('end');
expect(testQueue.bclient.status).to.be('end');
expect(testQueue.eclient.status).to.be('end');
});

@@ -94,2 +92,3 @@ });

it('should close if the job expires after the LOCK_RENEW_TIME', function (done) {
this.timeout(testQueue.STALLED_JOB_CHECK_INTERVAL * 2);
testQueue.LOCK_RENEW_TIME = 10;

@@ -145,10 +144,10 @@ testQueue.process(function () {

queue.once('ready', function () {
expect(queue.client.connection_options.host).to.be('127.0.0.1');
expect(queue.bclient.connection_options.host).to.be('127.0.0.1');
expect(queue.client.options.host).to.be('127.0.0.1');
expect(queue.bclient.options.host).to.be('127.0.0.1');
expect(queue.client.connection_options.port).to.be(6379);
expect(queue.bclient.connection_options.port).to.be(6379);
expect(queue.client.options.port).to.be(6379);
expect(queue.bclient.options.port).to.be(6379);
expect(queue.client.selected_db).to.be(0);
expect(queue.bclient.selected_db).to.be(0);
expect(queue.client.options.db).to.be(0);
expect(queue.bclient.options.db).to.be(0);

@@ -163,10 +162,10 @@ queue.close().then(done);

queue.once('ready', function () {
expect(queue.client.connection_options.host).to.be('127.0.0.1');
expect(queue.bclient.connection_options.host).to.be('127.0.0.1');
expect(queue.client.options.host).to.be('127.0.0.1');
expect(queue.bclient.options.host).to.be('127.0.0.1');
expect(queue.client.connection_options.port).to.be(6379);
expect(queue.bclient.connection_options.port).to.be(6379);
expect(queue.client.options.port).to.be(6379);
expect(queue.bclient.options.port).to.be(6379);
expect(queue.client.selected_db).to.be(0);
expect(queue.bclient.selected_db).to.be(0);
expect(queue.client.options.db).to.be(0);
expect(queue.bclient.options.db).to.be(0);

@@ -182,10 +181,10 @@ queue.close().then(done);

queue.once('ready', function () {
expect(queue.client.connection_options.host).to.be('127.0.0.1');
expect(queue.bclient.connection_options.host).to.be('127.0.0.1');
expect(queue.client.options.host).to.be('127.0.0.1');
expect(queue.bclient.options.host).to.be('127.0.0.1');
expect(queue.client.connection_options.port).to.be(6379);
expect(queue.bclient.connection_options.port).to.be(6379);
expect(queue.client.options.port).to.be(6379);
expect(queue.bclient.options.port).to.be(6379);
expect(queue.client.selected_db).to.be(0);
expect(queue.bclient.selected_db).to.be(0);
expect(queue.client.condition.select).to.be(0);
expect(queue.bclient.condition.select).to.be(0);

@@ -201,10 +200,10 @@ queue.close().then(done);

queue.once('ready', function () {
expect(queue.client.connection_options.host).to.be('127.0.0.1');
expect(queue.bclient.connection_options.host).to.be('127.0.0.1');
expect(queue.client.options.host).to.be('127.0.0.1');
expect(queue.bclient.options.host).to.be('127.0.0.1');
expect(queue.client.connection_options.port).to.be(6379);
expect(queue.bclient.connection_options.port).to.be(6379);
expect(queue.client.options.port).to.be(6379);
expect(queue.bclient.options.port).to.be(6379);
expect(queue.client.selected_db).to.be(1);
expect(queue.bclient.selected_db).to.be(1);
expect(queue.client.options.db).to.be(1);
expect(queue.bclient.options.db).to.be(1);

@@ -219,7 +218,7 @@ queue.close().then(done);

queue.once('ready', function () {
expect(queue.client.connection_options.host).to.be('localhost');
expect(queue.bclient.connection_options.host).to.be('localhost');
expect(queue.client.options.host).to.be('localhost');
expect(queue.bclient.options.host).to.be('localhost');
expect(queue.client.selected_db).to.be(0);
expect(queue.bclient.selected_db).to.be(0);
expect(queue.client.options.db).to.be(0);
expect(queue.bclient.options.db).to.be(0);

@@ -280,3 +279,3 @@ queue.close().then(done);

var client = redis.createClient();
return client.flushdbAsync().then(function () {
return client.flushdb().then(function () {
return utils.newQueue();

@@ -289,2 +288,3 @@ }).then(function (_queue) {

afterEach(function () {
this.timeout(queue.STALLED_JOB_CHECK_INTERVAL * (1 + queue.MAX_STALLED_JOB_COUNT));
return utils.cleanupQueues();

@@ -298,3 +298,3 @@ });

done();
});
}).catch(done);
queue.add({ foo: 'bar' }).then(function (job) {

@@ -406,3 +406,3 @@ expect(job.jobId).to.be.ok();

expect(job.returnvalue).to.be.eql(37);
queue.client.hgetAsync(queue.toKey(job.jobId), 'returnvalue').then(function (retval) {
queue.client.hget(queue.toKey(job.jobId), 'returnvalue').then(function (retval) {
expect(JSON.parse(retval)).to.be.eql(37);

@@ -484,7 +484,11 @@ done();

return queueStalled.close(true).then(function () {
return new Promise(function (resolve) {
return new Promise(function (resolve, reject) {
utils.newQueue('test queue stalled').then(function (queue2) {
queue2.LOCK_RENEW_TIME = 100;
var doneAfterFour = _.after(4, function () {
expect(stalledCallback.calledOnce).to.be(true);
try {
expect(stalledCallback.calledOnce).to.be(true);
} catch (e) {
reject(e);
}
resolve();

@@ -610,3 +614,3 @@ });

setTimeout(jobDone, 500);
});
}).catch(done);

@@ -826,3 +830,3 @@ utils.newQueue().then(function (_anotherQueue) {

var client = redis.createClient();
return client.flushdbAsync();
return client.flushdb();
});

@@ -891,3 +895,3 @@

it('should pause the queue locally', function (testDone) {
it('should pause the queue locally', function (done) {
var counter = 2;

@@ -897,14 +901,14 @@

queue.pause(true /* Local */).then(function () {
queue.pause(true /* Local */).tap(function () {
// Add the worker after the queue is in paused mode since the normal behavior is to pause
// it after the current lock expires. This way, we can ensure there isn't a lock already
// to test that pausing behavior works.
queue.process(function (job, done) {
queue.process(function (job, jobDone) {
expect(queue.paused).not.to.be.ok();
done();
jobDone();
counter--;
if (counter === 0) {
queue.close().then(testDone);
queue.close().then(done);
}
});
}).catch(done);
}).then(function () {

@@ -918,3 +922,3 @@ return queue.add({ foo: 'paused' });

return queue.resume(true /* Local */);
});
}).catch(done);
});

@@ -934,3 +938,3 @@

Promise.all(jobs).then(function () {
queue.pause(true).then(function () {
return queue.pause(true).then(function () {
var active = queue.getJobCountByTypes(['active']).then(function (count) {

@@ -967,3 +971,3 @@ expect(count).to.be(0);

});
});
}).catch(done);
});

@@ -1006,3 +1010,3 @@ });

var client = redis.createClient();
return client.flushdbAsync();
return client.flushdb();
});

@@ -1055,3 +1059,4 @@

queue.on('failed', function (err) {
queue.on('failed', function (job, err) {
err.job = job;
done(err);

@@ -1169,6 +1174,7 @@ });

queue = utils.buildQueue();
return client.flushdbAsync();
return client.flushdb();
});
afterEach(function () {
this.timeout(queue.STALLED_JOB_CHECK_INTERVAL * (1 + queue.MAX_STALLED_JOB_COUNT));
return queue.close();

@@ -1279,2 +1285,3 @@ });

afterEach(function () {
this.timeout(queue.STALLED_JOB_CHECK_INTERVAL * (1 + queue.MAX_STALLED_JOB_COUNT));
return queue.close();

@@ -1507,3 +1514,3 @@ });

describe.only('Jobs getters', function () {
describe('Jobs getters', function () {
var queue;

@@ -1517,2 +1524,3 @@

afterEach(function () {
this.timeout(queue.STALLED_JOB_CHECK_INTERVAL * (1 + queue.MAX_STALLED_JOB_COUNT));
return queue.close();

@@ -1648,2 +1656,3 @@ });

afterEach(function () {
this.timeout(queue.STALLED_JOB_CHECK_INTERVAL * (1 + queue.MAX_STALLED_JOB_COUNT));
return queue.close();

@@ -1760,2 +1769,3 @@ });

afterEach(function () {
this.timeout(queue.STALLED_JOB_CHECK_INTERVAL * (1 + queue.MAX_STALLED_JOB_COUNT));
return queue.close();

@@ -1762,0 +1772,0 @@ });

@@ -37,3 +37,5 @@ /*eslint-env node */

return Promise.map(queues, function(queue){
return queue.close();
var errHandler = function() {};
queue.on('error', errHandler);
return queue.close().catch(errHandler);
}).then(function(){

@@ -40,0 +42,0 @@ queues = [];

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc