Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

bull

Package Overview
Dependencies
Maintainers
1
Versions
198
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

bull - npm Package Compare versions

Comparing version 1.0.0-rc1 to 1.0.0-rc2

chains and topics.md

21

lib/job.js
/*eslint-env node */
/*global Promise:true */
'use strict';

@@ -9,2 +8,3 @@

var debuglog = require('debuglog')('bull');
var uuid = require('node-uuid');

@@ -285,7 +285,20 @@ /**

*/
Job.prototype.remove = function(){
Job.prototype.remove = function(token){
var queue = this.queue;
var job = this;
return scripts.remove(queue, this.jobId).then(function(){
queue.emit('removed', job);
var token = token || uuid();
return job.takeLock(token).then(function(lock) {
if (!lock) {
throw new Error('Could not get lock for job: ' + job.jobId + '. Cannot remove job.');
}
return scripts.remove(queue, job.jobId)
.then(function() {
queue.emit('removed', job);
})
.finally(function () {
return job.releaseLock(token);
});
});

@@ -292,0 +305,0 @@ };

111

lib/queue.js
/*eslint-env node */
/*global Promise:true */
'use strict';

@@ -56,3 +55,2 @@

var redisDB = 0;
if(_.isObject(redisPort)){

@@ -63,9 +61,12 @@ var opts = redisPort;

redisHost = redisOpts.host;
redisOptions = redisOpts.opts || {};
redisDB = redisOpts.DB || redisDB;
redisOptions = redisOpts.opts;
redisOptions.db = redisOpts.DB;
}
redisOptions = redisOptions || {};
var redisDB = redisOptions.db || 0;
function createClient() {
var client;
if(redisOptions !== undefined && redisOptions.createClient !== undefined){
if(_.isFunction(redisOptions.createClient)){
client = redisOptions.createClient();

@@ -84,2 +85,3 @@ }else{

this.name = name;
this.keyPrefix = redisOptions.keyPrefix || 'bull';

@@ -427,10 +429,6 @@ //

if(this.closed){
return this.closed;
}
return this.processStalledJobs().then(function(){
while(concurrency--){
promises.push(_this.processJobs());
promises.push(new Promise(_this.processJobs));
}

@@ -489,9 +487,13 @@

var _this = this;
return this.client.lrangeAsync(this.toKey('active'), 0, -1).then(function(jobs){
return Promise.each(jobs, function(jobId) {
return Job.fromId(_this, jobId).then(_this.processStalledJob);
if(this.closed){
return this.closed;
} else{
return this.client.lrangeAsync(this.toKey('active'), 0, -1).then(function(jobs){
return Promise.each(jobs, function(jobId) {
return Job.fromId(_this, jobId).then(_this.processStalledJob);
});
}).catch(function(err){
console.error(err);
});
}).catch(function(err){
console.error(err);
});
}
};

@@ -511,3 +513,10 @@

return _this.processJob(job, true);
} else {
return job.releaseLock(_this.token);
}
}).catch(function(err) {
// Any uncaught error will come here. We'll ensure that the job lock is released and rethrow
// the error so it bubbles normally
job.releaseLock(_this.token);
throw err;
});

@@ -519,17 +528,19 @@ }

Queue.prototype.processJobs = function(){
Queue.prototype.processJobs = function(resolve, reject){
var _this = this;
var processJobs = this.processJobs.bind(this, resolve, reject);
if(this.closed){
return this.closed;
if(!this.closed){
process.nextTick(function(){
(_this.paused || Promise.resolve())
.then(_this.getNextJob)
.then(_this.processJob)
.then(processJobs, function(err){
console.error('Error processing job:', err);
processJobs();
}).catch(reject);
});
}else{
resolve(this.closed);
}
return (this.paused || Promise.resolve())
.then(this.getNextJob)
.then(this.processJob)
// Avoid https://github.com/OptimalBits/bull/issues/243
.then(this.processJobs, function(err){
console.error(err);
return _this.processJobs;
});
};

@@ -563,6 +574,8 @@

_this.processing--;
return job.moveToCompleted(data).then(function(){
_this.emit('completed', job, data);
return null; // Fixes #253
});
return job.moveToCompleted(data)
.then(job.releaseLock.bind(job, _this.token))
.then(function(){
_this.emit('completed', job, data);
return null; // Fixes #253
});
}

@@ -744,3 +757,11 @@

case 'SET':
jobs = this.client.smembersAsync(key);
jobs = this.client.smembersAsync(key).then(function(jobIds) {
// Can't set a range for smembers. So do the slice programatically instead.
// Note that redis ranges are inclusive, so handling for javascript accordingly
if (end === -1) {
return jobIds.slice(start);
}
return jobIds.slice(start, end + 1);
});
break;

@@ -763,3 +784,3 @@ case 'ZSET':

Queue.prototype.toKey = function(queueType){
return 'bull:' + this.name + ':' + queueType;
return [this.keyPrefix, this.name, queueType].join(':');
};

@@ -790,14 +811,15 @@

//a super hack to deal with different job types
getter = 'get' + type.charAt(0).toUpperCase() + type.slice(1);
if(getter !== 'getCompleted' &&
getter !== 'getWaiting' &&
getter !== 'getActive' &&
getter !== 'getDelayed' &&
getter !== 'getFailed') {
if(_.indexOf([
'completed',
'waiting',
'active',
'delayed',
'failed'], type) === -1){
return reject(new Error('Cannot clean unkown queue type'));
}
getter = 'get' + type.charAt(0).toUpperCase() + type.slice(1);
_this[getter]().then(function (jobs) {
//take all jobs outside of the grace period
// take all jobs outside of the grace period
return Promise.filter(jobs, function (job) {

@@ -807,8 +829,7 @@ return job && ((!job.timestamp) || (job.timestamp < Date.now() - grace));

}).then(function (jobs) {
//remove those old jobs
// remove those old jobs
return Promise.each(jobs, function (job) {
return job.remove();
return job.remove(_this.token);
});
}).then(function (jobs) {
//let everyone know we cleaned up
_this.emit('cleaned', jobs, type);

@@ -815,0 +836,0 @@ resolve(jobs);

@@ -160,4 +160,6 @@ /**

if(!_.isUndefined(job.returnvalue)){
if(job.returnvalue){
args.push(JSON.stringify(job.returnvalue));
}else{
args.push('');
}

@@ -232,8 +234,6 @@

var script = [
'if (redis.call("SISMEMBER", KEYS[5], ARGV[1]) == 0) and (redis.call("SISMEMBER", KEYS[6], ARGV[1]) == 0) then',
' redis.call("LREM", KEYS[1], 0, ARGV[1])',
' redis.call("LREM", KEYS[2], 0, ARGV[1])',
' redis.call("ZREM", KEYS[3], ARGV[1])',
' redis.call("LREM", KEYS[4], 0, ARGV[1])',
'end',
'redis.call("LREM", KEYS[1], 0, ARGV[1])',
'redis.call("LREM", KEYS[2], 0, ARGV[1])',
'redis.call("ZREM", KEYS[3], ARGV[1])',
'redis.call("LREM", KEYS[4], 0, ARGV[1])',
'redis.call("SREM", KEYS[5], ARGV[1])',

@@ -240,0 +240,0 @@ 'redis.call("SREM", KEYS[6], ARGV[1])',

/*eslint-env node */
/*global Promise:true */
'use strict';

@@ -4,0 +3,0 @@

{
"name": "bull",
"version": "1.0.0-rc1",
"version": "1.0.0-rc2",
"description": "Job manager",

@@ -24,3 +24,3 @@ "main": "index.js",

"node-uuid": "^1.4.7",
"redis": "^2.5.3",
"redis": "^2.6.0-2",
"semver": "^4.2.0"

@@ -27,0 +27,0 @@ },

@@ -32,3 +32,13 @@ Bull Job Manager

UIs:
----
There are a few third party UIs that can be used for easier administration of the queues (not in any particular order):
[matador](https://github.com/ShaneK/Matador)
[react-bull](https://github.com/kfatehi/react-bull)
[toureiro](https://github.com/Epharmix/Toureiro)
We also have an official UI which is at the moment bare bones project: [bull-ui](https://github.com/OptimalBits/bull-ui)
Install:

@@ -225,2 +235,8 @@ --------

Important Notes
---------------
The queue aims for "at most once" working strategy. When a worker is processing a job, it will keep the job locked until the work is done. However, it is important that the worker does not lock the event loop too long, otherwise other workers could pick the job believing that the worker processing it has been stalled.
Useful patterns

@@ -593,5 +609,5 @@ ---------------

The only difference is that the Queue#add() allow an options opts.priority that could take
["low", "normal", "medium", "hight", "critical"]. If no options provider, "normal" will be taken.
["low", "normal", "medium", "high", "critical"]. If no options provider, "normal" will be taken.
The priority queue will process more often highter priority jobs than lower.
The priority queue will process more often higher priority jobs than lower.

@@ -598,0 +614,0 @@ ```javascript

/*eslint-env node */
/*global Promise:true */
'use strict';

@@ -4,0 +3,0 @@

/*eslint-env node */
/*global Promise:true */
'use strict';

@@ -4,0 +3,0 @@

/*eslint-env node */
/*global Promise:true */
'use strict';

@@ -73,2 +72,36 @@

});
it('should handle jobs added before and after a redis disconnect', function(done){
var count = 0;
queue.process(function (job, jobDone) {
if(count == 0){
expect(job.data.foo).to.be.equal('bar');
jobDone();
} else {
jobDone();
queue.close().then(done, done);
}
count ++;
}).catch(function(err){
console.log(err);
});
queue.on('completed', function(){
if(count === 1){
queue.bclient.stream.end();
queue.bclient.emit('error', new Error('ECONNRESET'));
}
});
queue.on('ready', function(){
queue.add({ 'foo': 'bar' });
});
queue.on('error', function (err) {
if(count === 1) {
queue.add({ 'foo': 'bar' });
}
});
});
});
/*eslint-env node */
/*global Promise:true */
'use strict';

@@ -76,2 +75,38 @@

it('fails to remove a locked job', function() {
var token = uuid();
return Job.create(queue, 1, {foo: 'bar'}).then(function(job) {
return job.takeLock(token).then(function(lock) {
expect(lock).to.be(true);
}).then(function() {
return job.remove(token);
}).then(function() {
throw new Error('Should not be able to remove a locked job');
}).catch(function(err) {
expect(err.message).to.equal('Could not get lock for job: ' + job.jobId + '. Cannot remove job.');
});
});
});
it('removes any job from active set', function() {
return queue.add({ foo: 'bar' }).then(function(job) {
// Simulate a job in active state but not locked
return queue.moveJob('wait', 'active').then(function() {
return job.isActive().then(function(isActive) {
expect(isActive).to.be(true);
return job.remove();
});
}).then(function() {
return Job.fromId(queue, job.jobId);
}).then(function(stored) {
expect(stored).to.be(null);
return job.getState();
}).then(function(state) {
// This check is a bit of a hack. A job that is not found in any list will return the state
// stuck.
expect(state).to.equal('stuck');
});
});
});
it('emits removed event', function (cb) {

@@ -86,2 +121,26 @@ queue.once('removed', function (job) {

});
it('a succesful job should be removable', function(done) {
queue.process(function () {
return Promise.resolve();
});
queue.add({ foo: 'bar' });
queue.on('completed', function(job) {
job.remove().then(done).catch(done);
});
});
it('a failed job should be removable', function(done) {
queue.process(function () {
throw new Error();
});
queue.add({ foo: 'bar' });
queue.on('failed', function(job) {
job.remove().then(done).catch(done);
});
});
});

@@ -88,0 +147,0 @@

/// <reference path='../typings/mocha/mocha.d.ts'/>
/*eslint-env node */
/*global Promise:true */
'use strict';

@@ -5,0 +4,0 @@

/*eslint-env node */
/*global Promise:true */
'use strict';

@@ -416,3 +415,3 @@

return queueStalled.close(true).then(function(){
return new Promise(function(resolve, reject) {
return new Promise(function(resolve) {
utils.newQueue('test queue stalled').then(function(queue2){

@@ -638,3 +637,3 @@ queue2.LOCK_RENEW_TIME = 100;

});
queue.process(function (job) {
queue.process(function () {
var circular = {};

@@ -820,3 +819,3 @@ circular.x = circular;

it('should pause the queue locally', function(testDone){
var ispaused = false, counter = 2;
var counter = 2;

@@ -1123,3 +1122,3 @@ var queue = utils.buildQueue();

queue.process(4, function (job) {
queue.process(4, function () {
nbProcessing++;

@@ -1426,2 +1425,114 @@ expect(nbProcessing).to.be.lessThan(5);

describe('getJobs', function() {
var queue;
beforeEach(function(){
queue = utils.buildQueue();
return queue.clean(1000);
});
afterEach(function(){
return queue.close();
});
it('should return all completed jobs when not setting start/end', function(done) {
queue.process(function(job, completed) {
completed();
});
queue.on('completed', _.after(3, function() {
queue.getJobs('completed', 'SET').then(function(jobs) {
expect(jobs).to.be.an(Array);
expect(jobs).to.have.length(3);
done();
}).catch(done);
}));
queue.add({ foo: 1 });
queue.add({ foo: 2 });
queue.add({ foo: 3 });
});
it('should return all failed jobs when not setting start/end', function(done) {
queue.process(function(job, completed) {
completed(new Error('error'));
});
queue.on('failed', _.after(3, function() {
queue.getJobs('failed', 'SET').then(function(jobs) {
expect(jobs).to.be.an(Array);
expect(jobs).to.have.length(3);
done();
}).catch(done);
}));
queue.add({ foo: 1 });
queue.add({ foo: 2 });
queue.add({ foo: 3 });
});
it('should return subset of jobs when setting positive range', function(done) {
queue.process(function(job, completed) {
completed();
});
queue.on('completed', _.after(3, function() {
queue.getJobs('completed', 'SET', 1, 2).then(function(jobs) {
expect(jobs).to.be.an(Array);
expect(jobs).to.have.length(2);
expect(jobs[0].data.foo).to.be.equal(2);
expect(jobs[1].data.foo).to.be.eql(3);
done();
}).catch(done);
}));
queue.add({ foo: 1 });
queue.add({ foo: 2 });
queue.add({ foo: 3 });
});
it('should return subset of jobs when setting a negative range', function(done) {
queue.process(function(job, completed) {
completed();
});
queue.on('completed', _.after(3, function() {
queue.getJobs('completed', 'SET', -3, -1).then(function(jobs) {
expect(jobs).to.be.an(Array);
expect(jobs).to.have.length(3);
expect(jobs[0].data.foo).to.be.equal(1);
expect(jobs[1].data.foo).to.be.eql(2);
expect(jobs[2].data.foo).to.be.eql(3);
done();
}).catch(done);
}));
queue.add({ foo: 1 });
queue.add({ foo: 2 });
queue.add({ foo: 3 });
});
it('should return subset of jobs when range overflows', function(done) {
queue.process(function(job, completed) {
completed();
});
queue.on('completed', _.after(3, function() {
queue.getJobs('completed', 'SET', -300, 99999).then(function(jobs) {
expect(jobs).to.be.an(Array);
expect(jobs).to.have.length(3);
expect(jobs[0].data.foo).to.be.equal(1);
expect(jobs[1].data.foo).to.be.eql(2);
expect(jobs[2].data.foo).to.be.eql(3);
done();
}).catch(done);
}));
queue.add({ foo: 1 });
queue.add({ foo: 2 });
queue.add({ foo: 3 });
});
});
describe('Cleaner', function () {

@@ -1428,0 +1539,0 @@ var queue;

/*eslint-env node */
/*global Promise:true */
'use strict';

@@ -4,0 +3,0 @@

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc