Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

bull

Package Overview
Dependencies
Maintainers
1
Versions
198
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

bull - npm Package Compare versions

Comparing version 2.1.2 to 2.2.0

11

CHANGELOG.md

@@ -0,1 +1,12 @@

v.2.2.0
=======
- Much improved priority queues, simpler, faster and more reliable.
- Fixed issue where lua scripts where leaking memory.
- Improvements in local pause, fixing #446 and #447.
- Fix to increase delay time over 24 days #244
[Changes](https://github.com/OptimalBits/bull/compare/v2.1.2...v2.2.0)
v.2.1.2

@@ -2,0 +13,0 @@ =======

7

lib/job.js

@@ -40,5 +40,10 @@ /*eslint-env node */

function addJob(queue, job){
var opts = job.opts;
var jobData = job.toData();
var toKey = _.bind(queue.toKey, queue);
return scripts.addJob(queue.client, toKey, jobData, { lifo: job.opts.lifo, customJobId: job.opts.jobId });
return scripts.addJob(queue.client, toKey, jobData, {
lifo: opts.lifo,
customJobId: opts.jobId,
priority: opts.priority
});
}

@@ -45,0 +50,0 @@

@@ -20,2 +20,4 @@ "use strict";

console.warn("DEPRECATION NOTICE: PriorityQueue has been deprecated and will be removed in bull 3.0.0, please use the priority option instead.");
var _this = this;

@@ -22,0 +24,0 @@ this.paused = false;

125

lib/queue.js

@@ -17,14 +17,15 @@ /*eslint-env node */

/**
Gets or creates a new Queue with the given name.
The Queue keeps 5 data structures:
The Queue keeps 6 data structures:
- wait (list)
- active (list)
- delayed (zset)
- priority (zset)
- completed (set)
- failed (set)
-- >completed
/
--> priorities -- >completed
/ /
job -> wait -> active

@@ -40,3 +41,3 @@ | ^ \

The mechanism is simple, a delayedTimestamp variable holds the next
known timestamp that is on the delayed set (or MAX_INT if none).
known timestamp that is on the delayed set (or MAX_TIMEOUT_MS if none).

@@ -64,2 +65,4 @@ When the current job has finalized the variable is checked, if

var MAX_TIMEOUT_MS = Math.pow(2, 31) - 1; // 32 bit signed
var Queue = function Queue(name, redisPort, redisHost, redisOptions){

@@ -101,5 +104,5 @@ if(!(this instanceof Queue)){

if(_.isFunction(redisOptions.createClient)){
client = redisOptions.createClient();
client = new redisOptions();
}else{
client = redis.createClient(redisPort, redisHost, redisOptions);
client = new redis(redisPort, redisHost, redisOptions);
}

@@ -156,2 +159,3 @@ return client;

this.processing = 0;
this.retrieving = 0;

@@ -269,2 +273,10 @@ this.LOCK_RENEW_TIME = LOCK_RENEW_TIME;

Queue.prototype.getJobMoveCount = function(){
return this.bclient.commandQueue.length;
};
Queue.prototype.whenCurrentMoveFinished = function(){
var currentMove = this.bclient.commandQueue.peekFront()
return currentMove && currentMove.command.promise || Promise.resolve();
};
/**

@@ -324,2 +336,3 @@ *

return this.closing = this._initializing.then(function(){

@@ -555,3 +568,3 @@ clearTimeout(_this.delayTimer);

if(newDelayedTimestamp < _this.delayedTimestamp){
if(newDelayedTimestamp < _this.delayedTimestamp && newDelayedTimestamp < (MAX_TIMEOUT_MS + Date.now())){
clearTimeout(this.delayTimer);

@@ -571,3 +584,3 @@ this.delayedTimestamp = newDelayedTimestamp;

_this.updateDelayTimer(nextTimestamp);
}).catch(function(err){
}).catch(function(err){
console.error('Error updating the delay timer', err);

@@ -716,4 +729,2 @@ });

this.processing++;
return lockRenewer().then(function(locked){

@@ -739,4 +750,20 @@ if(locked){

Queue.prototype.getNextJob = function(opts){
var _this = this;
if(!this.closing){
return this.moveJob('wait', 'active', opts).then(this.getJobFromId);
this.retrieving++;
return this.moveJob('wait', 'active', opts)
.then(this.getJobFromId)
.tap(function(job) {
_this.retrieving--;
if (job) {
_this.processing++;
} else {
_this.emit('no-job-retrieved');
}
})
.catch(function(err) {
_this.retrieving--;
_this.emit('no-job-retrieved');
throw err;
});
}else{

@@ -757,23 +784,40 @@ return Promise.reject();

Queue.prototype.moveJob = function(src, dst, opts) {
var args = arguments;
var _this = this;
var move;
if(opts && opts.block === false){
if(!this.closing){
return this.bclient.rpoplpush(this.toKey(src), this.toKey(dst));
move = this.bclient.rpoplpush(this.toKey(src), this.toKey(dst));
}else{
return Promise.reject();
move = Promise.reject();
}
} else if (this.closing || this.paused) {
move = Promise.resolve();
} else if (this.getJobMoveCount()) {
move = this.whenCurrentMoveFinished().then(function() {
return _this.moveJob.apply(_this, args);
});
}else{
return this.bclient.brpoplpush(
move = this.bclient.brpoplpush(
this.toKey(src),
this.toKey(dst),
Math.floor(this.LOCK_RENEW_TIME / 1000)).then(function(jobId) {
// Return undefined instead of Promise.reject if there is no jobId
// Avoid Promise.reject because https://github.com/OptimalBits/bull/issues/144
Math.floor(this.LOCK_RENEW_TIME / 1000));
}
return move.then(function(jobId){
//
// Unfortunatelly this cannot be performed atomically, which will lead to a
// slight hazard for priority queues (will only affect its order).
//
if(jobId){
return _this.client.zrem(_this.toKey('priority'), jobId).then(function(){
return jobId;
}, function(err){
if(!_this.closing){
return err;
}
});
}
}
}, function(err){
if(!_this.closing){
throw err;
}
});
};

@@ -963,22 +1007,23 @@

var resolver;
return new Promise(function(resolve, reject) {
_this.getActiveCount().then(function(count) {
if(count === 0){
var count = this.processing + this.retrieving;
return new Promise(function(resolve) {
if(count === 0){
resolve();
}else{
resolver = _.after(count, function(){
_this.removeListener('stalled', resolver);
_this.removeListener('completed', resolver);
_this.removeListener('failed', resolver);
_this.removeListener('no-job-retrieved', resolver);
clearInterval(_this.moveUnlockedJobsToWaitInterval);
resolve();
}else{
resolver = _.after(count, function(){
_this.removeListener('stalled', resolver);
_this.removeListener('completed', resolver);
_this.removeListener('failed', resolver);
clearInterval(_this.moveUnlockedJobsToWaitInterval);
resolve();
});
});
_this.on('stalled', resolver);
_this.on('completed', resolver);
_this.on('failed', resolver);
_this.on('stalled', resolver);
_this.on('completed', resolver);
_this.on('failed', resolver);
_this.on('no-job-retrieved', resolver);
_this.startMoveUnlockedJobsToWait();
}
}, reject);
_this.startMoveUnlockedJobsToWait();
}
});

@@ -985,0 +1030,0 @@ };

@@ -25,18 +25,28 @@ /**

function isCommandDefined(client, hash){
return !!client[hash];
}
var scripts = {
_isJobInList: function(keyVar, argVar, operator) {
keyVar = keyVar || 'KEYS[1]';
argVar = argVar || 'ARGV[1]';
operator = operator || 'return';
return [
'local function item_in_list (list, item)',
' for _, v in pairs(list) do',
' if v == item then',
' return 1',
' end',
' end',
' return nil',
'end',
['local items = redis.call("LRANGE",', keyVar, ' , 0, -1)'].join(''),
[operator, ' item_in_list(items, ', argVar, ')'].join('')
keyVar = keyVar ? 'splitKey[1]..":"..splitKey[2]..":active"' : 'KEYS[1]';
argVar = argVar || 'ARGV[1]';
operator = operator || 'return';
return [
'local function item_in_list (list, item)',
' for _, v in pairs(list) do',
' if v == item then',
' return 1',
' end',
' end',
' return nil',
'end',
'local splitKey={}',
'local i=1',
'for str in string.gmatch(KEYS[1], "([^:]+)") do',
'splitKey[i] = str',
'i = i + 1',
'end',
[ 'local items = redis.call("LRANGE", ', keyVar, ' , 0, -1)' ].join(''),
[ operator, ' item_in_list(items, ', argVar, ')' ].join('')
].join('\n');

@@ -50,8 +60,25 @@ },

addJob: function(client, toKey, job, opts){
var delayed;
var scriptName;
opts = opts || {};
opts.lifo = !!(opts.lifo);
var delayTimestamp = job.timestamp + job.delay;
if(job.delay && delayTimestamp > Date.now()){
delayed = true;
scriptName = 'addJob:delayed';
} else {
scriptName = 'addJob'+(opts.lifo?':lifo':'') + (opts.priority?':priority':'');
}
/*
if(isCommandDefined(client, scriptName)){
return client[scriptName].apply(client, args);
};
*/
var jobArgs = _.flatten(_.toPairs(job));
var keys = _.map(['wait', 'paused', 'meta-paused', 'jobs', 'id', 'delayed'], function(name){
var keys = _.map(['wait', 'paused', 'meta-paused', 'jobs', 'id', 'delayed', 'priority'], function(name){
return toKey(name);

@@ -62,3 +89,3 @@ });

var argvs = _.map(jobArgs, function(arg, index){
return ', ARGV['+(index+3)+']';
return ', ARGV['+(index+4)+']';
})

@@ -75,8 +102,6 @@

var scriptName;
var delayTimestamp = job.timestamp + job.delay;
if(job.delay && delayTimestamp > Date.now()){
if(delayed){
script.push.apply(script, [
' local timestamp = tonumber(ARGV[' + (argvs.length + 3) + ']) * 0x1000 + bit.band(jobCounter, 0xfff)',
' local timestamp = tonumber(ARGV[' + (argvs.length + 4) + ']) * 0x1000 + bit.band(jobCounter, 0xfff)',
' redis.call("ZADD", KEYS[6], timestamp, jobId)',

@@ -86,6 +111,32 @@ ' redis.call("PUBLISH", KEYS[6], (timestamp / 0x1000))',

]);
}else{
var push, pushPaused;
var add = _.template('redis.call("<%= direction %>", <%= waitQueue %>, jobId)');
scriptName = 'addJob:delayed';
}else{
var push = (opts.lifo ? 'R' : 'L') + 'PUSH';
if(opts.lifo){
push = add({direction: 'RPUSH', waitQueue: 'KEYS[1]'});
pushPaused = add({direction: 'RPUSH', waitQueue: 'KEYS[2]'});
}else if(opts.priority){
script.push.apply(script, [
' redis.call("ZADD", KEYS[7], ARGV[3], jobId)',
' local count = redis.call("ZCOUNT", KEYS[7], 0, ARGV[3])',
]);
var priorityAdd = _.template([
' local len = redis.call("LLEN", <%= waitQueue %>)',
' local id = redis.call("LINDEX", <%= waitQueue %>, len - (count-1))',
' if id then',
' redis.call("LINSERT", <%= waitQueue %>, "BEFORE", id, jobId)',
' else',
' redis.call("RPUSH", <%= waitQueue %>, jobId)',
' end',
].join('\n'));
push = priorityAdd({waitQueue: 'KEYS[1]'});
pushPaused = priorityAdd({waitQueue: 'KEYS[2]'});
}else{
push = add({direction: 'LPUSH', waitQueue: 'KEYS[1]'});
pushPaused = add({direction: 'LPUSH', waitQueue: 'KEYS[2]'});
}
//

@@ -96,5 +147,5 @@ // Whe check for the meta-paused key to decide if we are paused or not

'if redis.call("EXISTS", KEYS[3]) ~= 1 then',
' redis.call("' + push + '", KEYS[1], jobId)',
push,
'else',
' redis.call("' + push + '", KEYS[2], jobId)',
pushPaused,
'end',

@@ -104,4 +155,2 @@ 'redis.call("PUBLISH", KEYS[4], jobId)',

]);
scriptName = 'addJob'+push;
}

@@ -119,2 +168,3 @@

args.push(opts.customJobId || '');
args.push(opts.priority);
args.push.apply(args, jobArgs);

@@ -125,2 +175,3 @@ args.push(delayTimestamp);

},
// TODO: perfect this function so that it can be used instead

@@ -312,3 +363,3 @@ // of all the specialized functions moveToComplete, etc.

var isJobInList = this._isJobInList(keyVar, argVar, 'if');
var lockAcquired = ['and redis.call("HSET", "', queue.toKey(job.jobId), '", "lockAcquired", "1")'].join('');
var lockAcquired = 'and redis.call("HSET", splitKey[1]..":"..splitKey[2]..":"..splitKey[3], "lockAcquired", "1")';
var success = 'then return 1 else return 0 end';

@@ -315,0 +366,0 @@ var opts = {

{
"name": "bull",
"version": "2.1.2",
"version": "2.2.0",
"description": "Job manager",

@@ -5,0 +5,0 @@ "main": "index.js",

@@ -349,3 +349,3 @@ Bull Job Manager

redisHost {String} A host specified as IP or domain where redis is running.
redisOptions {Object} Options to pass to the redis client. https://github.com/mranney/node_redis
redisOptions {Object} Options to pass to the redis client. https://github.com/luin/ioredis/blob/master/API.md#new-redisport-host-options
```

@@ -360,3 +360,3 @@

redisConnectionString {String} A connection string containing the redis server host, port and (optional) authentication.
redisOptions {Object} Options to pass to the redis client. https://github.com/mranney/node_redis
redisOptions {Object} Options to pass to the redis client. https://github.com/luin/ioredis/blob/master/API.md#new-redisport-host-options
```

@@ -436,3 +436,7 @@

{
priority {Number} Optional priority value, ranges from 1 (highest priority) to MAX_INT (lowest priority). Note that
using priorities has a slight impact on performance, so do not use if not required.
delay {Number} An amount of miliseconds to wait until this job can be processed. Note that for accurate delays, both server and clients should have their clocks synchronized. [optional]
attempts {Number} The total number of attempts to try the job until it completes.

@@ -442,8 +446,10 @@

backoff.type {String} Backoff type, which can be either `fixed` or `exponential`
backoff.delay {Number} Backoff delay, in milliseconds
backoff.delay {Number} Backoff delay, in milliseconds.
lifo {Boolean} A boolean which, if true, adds the job to the right of the queue
instead of the left (default false)
timeout {Number} The number of milliseconds after which the job should be fail
with a timeout error [optional]
jobId {Number|String} Override the job ID - by default, the job ID is a unique

@@ -454,2 +460,3 @@ integer, but you can use this setting to override it.

already exists, it will not be added.
removeOnComplete {Boolean} A boolean which, if true, removes the job when it successfully

@@ -646,6 +653,10 @@ completes. Default behavior is to keep the job in the completed queue.

<a name="priorityQueue"/>
###PriorityQueue(queueName, redisPort, redisHost, [redisOpts])
### DEPRECATION notice
The priority queue has been deprecated since version 2.2.0 in favor of a new option, *priority* in [Queue##add](#add).
The priorityQueue will be removed from the code base in version 3.0.0.
--
This is the Queue constructor of priority queue. It works same a normal queue, with same function and parameters.

@@ -681,3 +692,3 @@ The only difference is that the Queue#add() allow an options opts.priority that could take

The most important property for the user is Job##data that includes the
object that was passed to Queue##add, and that is normally used to
object that was passed to [Queue##add](#add), and that is normally used to
perform the job.

@@ -684,0 +695,0 @@

@@ -21,3 +21,3 @@ /*eslint-env node */

// we need to purge all keys after each test
var client = redis.createClient(6379, '127.0.0.1', {});
var client = new redis(6379, '127.0.0.1', {});
client.select(0);

@@ -24,0 +24,0 @@

@@ -15,3 +15,3 @@ /*eslint-env node */

beforeEach(function(){
var client = redis.createClient();
var client = new redis();
return client.flushdb().then(function(){

@@ -18,0 +18,0 @@ queue = utils.buildQueue();

@@ -17,3 +17,3 @@ /*eslint-env node */

beforeEach(function(){
var client = redis.createClient();
var client = new redis();
return client.flushdb();

@@ -388,3 +388,3 @@ });

var client = redis.createClient();
var client = new redis();
return Job.create(queue, {foo: 'baz'}).then(function(job) {

@@ -391,0 +391,0 @@ return job.isStuck().then(function(isStuck) {

@@ -29,3 +29,3 @@ /// <reference path='../typings/mocha/mocha.d.ts'/>

beforeEach(function(){
var client = redis.createClient();
var client = new redis();
return client.flushdb();

@@ -50,3 +50,3 @@ });

clients++;
return redis.createClient();
return new redis();
}

@@ -825,3 +825,3 @@ }

it('should clean a job without a timestamp', function (done) {
var client = redis.createClient(6379, '127.0.0.1', {});
var client = new redis(6379, '127.0.0.1', {});

@@ -828,0 +828,0 @@ queue.add({some: 'data'}, {priority: 'normal'});

@@ -32,3 +32,3 @@ /*eslint-env node */

beforeEach(function () {
var client = redis.createClient();
var client = new redis();
return client.flushdb();

@@ -270,3 +270,3 @@ });

beforeEach(function () {
var client = redis.createClient();
var client = new redis();
return client.flushdb().then(function () {

@@ -323,2 +323,45 @@ return utils.newQueue();

it('should processes jobs by priority', function(done){
var normalPriority = [],
mediumPriority = [],
highPriority = [];
// for the current strategy this number should not exceed 8 (2^2*2)
// this is done to maitain a deterministic output.
var numJobsPerPriority = 6;
for(var i = 0; i < numJobsPerPriority; i++){
normalPriority.push(queue.add({p: 2}, {priority: 2}));
mediumPriority.push(queue.add({p: 3}, {priority: 3}));
highPriority.push(queue.add({p: 1}, {priority: 1}));
}
// wait for all jobs to enter the queue and then start processing
Promise
.all(normalPriority, mediumPriority, highPriority)
.then(function(){
var currentPriority = 1;
var counter = 0;
var total = 0;
queue.process(function(job, jobDone){
expect(job.jobId).to.be.ok();
expect(job.data.p).to.be(currentPriority);
jobDone();
total ++;
if(++counter === numJobsPerPriority){
currentPriority++;
counter = 0;
if(currentPriority === 4 && total === numJobsPerPriority * 3){
done();
}
}
});
}, done);
});
it('process several jobs serially', function (done) {

@@ -635,3 +678,3 @@ this.timeout(12000);

jobDone();
var client = redis.createClient();
var client = new redis();
client.srem(queue2.toKey('completed'), 1);

@@ -736,3 +779,3 @@ client.lpush(queue2.toKey('active'), 1);

it('does not renew a job lock after the lock has been released [#397]', function (done) {
this.timeout(queue.LOCK_RENEW_TIME * 3);
this.timeout(queue.LOCK_RENEW_TIME * 4);

@@ -768,3 +811,3 @@ queue.process(function (job) {

var retryQueue = utils.buildQueue('retry-test-queue');
var client = redis.createClient(6379, '127.0.0.1', {});
var client = new redis(6379, '127.0.0.1', {});

@@ -846,3 +889,3 @@ client.select(0);

beforeEach(function () {
var client = redis.createClient();
var client = new redis();
return client.flushdb();

@@ -987,6 +1030,81 @@ });

});
it('should pause the queue locally when more than one worker is active', function () {
var queue1 = utils.buildQueue('pause-queue');
var queue1IsProcessing = new Promise(function(resolve) {
queue1.process(function(job, jobDone) {
resolve();
setTimeout(jobDone, 200);
});
});
var queue2 = utils.buildQueue('pause-queue');
var queue2IsProcessing = new Promise(function(resolve) {
queue2.process(function(job, jobDone) {
resolve();
setTimeout(jobDone, 200);
});
});
queue1.add(1);
queue1.add(2);
queue1.add(3);
queue1.add(4);
return Promise.all([queue1IsProcessing, queue2IsProcessing]).then(function() {
return Promise.all([queue1.pause(true /* local */), queue2.pause(true /* local */)]).then(function() {
var active = queue1.getJobCountByTypes(['active']).then(function(count) {
expect(count).to.be(0);
});
var pending = queue1.getJobCountByTypes(['wait']).then(function(count) {
expect(count).to.be(2);
});
var completed = queue1.getJobCountByTypes(['completed']).then(function(count) {
expect(count).to.be(2);
});
return Promise.all([active, pending, completed]);
});
});
});
it('should wait for blocking job retrieval to complete before pausing locally', function() {
var queue = utils.buildQueue();
queue.process(function(job, jobDone) {
setTimeout(jobDone, 200);
});
return new Promise(function(resolve) {
queue.on('ready', resolve);
}).then(function() {
//start the pause process
var queueIsPaused = queue.pause(true);
//add some jobs
return Promise.all([ queue.add(1), queue.add(2) ]).then(function() {
//wait for the queue to finish pausing
return queueIsPaused;
});
}).then(function() {
var active = queue.getJobCountByTypes(['active']).then(function(count) {
expect(count).to.be(0);
});
var pending = queue.getJobCountByTypes(['wait']).then(function(count) {
expect(count).to.be(1);
});
var completed = queue.getJobCountByTypes(['completed']).then(function(count) {
expect(count).to.be(1);
});
return Promise.all([active, pending, completed]);
});
});
});
it('should publish a message when a new message is added to the queue', function (done) {
var client = redis.createClient(6379, '127.0.0.1', {});
var client = new redis(6379, '127.0.0.1', {});
client.select(0);

@@ -1022,3 +1140,3 @@ var queue = new Queue('test pub sub');

beforeEach(function () {
var client = redis.createClient();
var client = new redis();
return client.flushdb();

@@ -1030,3 +1148,3 @@ });

queue = new Queue('delayed queue simple');
var client = redis.createClient(6379, '127.0.0.1', {});
var client = new redis(6379, '127.0.0.1', {});
var timestamp = Date.now();

@@ -1056,3 +1174,3 @@ var publishHappened = false;

expect(publishHappened).to.be(true);
queue.close().then(done, done);
queue.close(true).then(done, done);
});

@@ -1148,3 +1266,3 @@ });

it('should process delayed jobs with exact same timestamps in correct order (FIFO)', function (done) {
var client = redis.createClient(6379, '127.0.0.1', {});
var client = new redis(6379, '127.0.0.1', {});
client = Promise.promisifyAll(client);

@@ -1187,3 +1305,3 @@ var QUEUE_NAME = 'delayed queue multiple' + uuid();

beforeEach(function () {
var client = redis.createClient();
var client = new redis();
queue = utils.buildQueue();

@@ -1930,3 +2048,3 @@ return client.flushdb();

it('should clean a job without a timestamp', function (done) {
var client = redis.createClient(6379, '127.0.0.1', {});
var client = new redis(6379, '127.0.0.1', {});

@@ -1933,0 +2051,0 @@ queue.add({ some: 'data' });

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc