Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

bull

Package Overview
Dependencies
Maintainers
1
Versions
198
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

bull - npm Package Compare versions

Comparing version 0.1.5 to 0.1.6

107

lib/queue.js

@@ -13,3 +13,3 @@ "use strict";

Gets or creates a new Queue with the given name.
The Queue keeps 4 data structures:

@@ -22,3 +22,3 @@ - wait (list)

/
job -> wait -> active
job -> wait -> active
\

@@ -34,3 +34,3 @@ - > failed

}
var redisDB = 0;

@@ -43,18 +43,36 @@ if(_.isObject(redisPort)){

redisOptions = redisOpts.opts || {};
redisDB = redisOpts.DB;
redisDB = redisOpts.DB;
}
this.name = name;
this.client = redis.createClient(redisPort, redisHost, redisOptions);
this.bclient = redis.createClient(redisPort, redisHost, redisOptions);
this.paused = false;
this.token = uuid();
this.LOCK_RENEW_TIME = LOCK_RENEW_TIME;
var _this = this;
// bubble up Redis error events and attempt to restart queue on
// error recovery.
var redisErrorOccurred = false;
this.client.on('error', function(err){
_this.emit('error', err);
});
this.bclient.on('error', function(err){
_this.emit('error', err);
redisErrorOccurred = true;
});
this.bclient.on('ready', function(){
if(redisErrorOccurred){
redisErrorOccurred = false;
_this.run();
}
});
// Promisify some redis client methods
var _this = this;
var methods = [
'lrange',
'lrange',
'sismember',

@@ -65,2 +83,3 @@ 'set',

'lpush',
'rpush',
'hset',

@@ -75,5 +94,5 @@ 'hmset',

});
this.bclient.brpoplpushAsync = Promise.promisify(this.bclient.BRPOPLPUSH);
this.client.select(redisDB, function(err){

@@ -96,3 +115,3 @@ _this.bclient.select(redisDB, function(err){

is dequeued.
@method process

@@ -106,3 +125,3 @@ */

});
this.handler = handler;

@@ -126,3 +145,4 @@ };

var _this = this;
opts = opts || {};
// If we fail after incrementing the job id we may end having an unused

@@ -133,3 +153,4 @@ // id, but this should not be so harmful

var key = _this.toKey('wait');
return _this.client.lpushAsync(key, jobId).then(function(){
// if queue is LIFO use rpushAsync
return _this.client[(opts.lifo ? 'r' : 'l') + 'pushAsync'](key, jobId).then(function(){
return job;

@@ -157,3 +178,3 @@ });

Empties the queue.
Returns a promise that is resolved after the operation has been completed.

@@ -164,3 +185,3 @@ Note that if some other process is adding jobs at the same time as emptying,

jobs, there will be zombie jobs left in redis.
TODO: Use EVAL to make this operation fully atomic.

@@ -170,6 +191,6 @@ */

var _this = this;
// Get all jobids and empty all lists atomically.
var multi = this.multi();
multi.lrange(this.toKey('wait'), 0, -1);

@@ -179,3 +200,3 @@ multi.lrange(this.toKey('paused'), 0, -1);

multi.del(this.toKey('paused'));
return multi.execAsync().then(function(res){

@@ -188,3 +209,3 @@ var waiting = res[0];

});
jobKeys = jobKeys.concat(_.map(paused, function(jobId){

@@ -196,6 +217,6 @@ return _this.toKey(jobId);

var multi = _this.multi();
multi.del.apply(multi, jobKeys);
return multi.execAsync();
}
}
});

@@ -207,3 +228,3 @@ }

TODO: This pause only pauses the current queue instance, it is not
good enough, we need to pause all instances. It should be great if RENAME can
good enough, we need to pause all instances. It should be great if RENAME can
be used for this. So when pausing we just rename the wait queue to paused.

@@ -218,3 +239,3 @@ BRPOPLPUSH still blocks even when a key does not exist, so it will block

if(this.paused) return this.paused;
var _this = this;

@@ -233,3 +254,3 @@

});
return this.paused;

@@ -294,3 +315,3 @@ }

var _this = this;
return this.getNextJob().then(function(job){

@@ -376,3 +397,3 @@ return _this.processJob(job);

Atomically moves a job from one list to another.
@method moveJob

@@ -385,27 +406,35 @@ */

Queue.prototype.getWaiting = function(start, end){
return this.getJobs('wait');
return this.getJobs('wait', true);
}
Queue.prototype.getActive = function(start, end){
return this.getJobs('active');
return this.getJobs('active', true);
}
Queue.prototype.getCompleted = function(start, end){
Queue.prototype.getCompleted = function(){
return this.getJobs('completed');
}
Queue.prototype.getFailed = function(start, end){
Queue.prototype.getFailed = function(){
return this.getJobs('failed');
}
Queue.prototype.getJobs = function(queueType, start, end){
Queue.prototype.getJobs = function(queueType, isList, start, end){
var _this = this;
start = _.isUndefined(start) ? 0 : start;
end = _.isUndefined(end) ? -1 : end;
var key = this.toKey(queueType);
var jobs;
if(isList){
start = _.isUndefined(start) ? 0 : start;
end = _.isUndefined(end) ? -1 : end;
jobs = this.client.lrangeAsync(key, start, end);
}else{
jobs = this.client.smembersAsync(key);
}
var key = this.toKey(queueType);
//this.client.lrange(key, start, end, function(err, jobIds){
return this.client.smembersAsync(key).then(function(jobIds){
return jobs.then(function(jobIds){
if(jobIds.length){

@@ -412,0 +441,0 @@ return Promise.all(_.map(jobIds, function(jobId){

{
"name": "bull",
"version": "0.1.5",
"version": "0.1.6",
"description": "Job manager",

@@ -5,0 +5,0 @@ "main": "index.js",

@@ -6,9 +6,9 @@ Bull Job Manager

A lightweight, robust and fast job processing queue.
A lightweight, robust and fast job processing queue.
Designed with stability and atomicity in mind. The API is inspired by Kue.
It uses redis for persistence, so the queue is not lost if the server goes
It uses redis for persistence, so the queue is not lost if the server goes
down for any reason.
If you need more features than the ones provided by Bull check
If you need more features than the ones provided by Bull check
[Kue](https://github.com/learnboost/kue) but keep in mind this open

@@ -26,2 +26,4 @@ [issue](https://github.com/LearnBoost/kue/issues/130).

Note that you need a redis version higher or equal than 2.6.12 for bull to work.
Quick Guide

@@ -37,15 +39,15 @@ -----------

videoQueue.process(function(job, done){
// job.data contains the custom data passed when the job was created
// job.jobId contains id of this job.
// transcode video asynchronously and report progress
job.progress(42);
// call done when finished
done();
// or give a error if error
done(Error('error transcoding'));
// If the job throws an unhandled exception it is also handled correctly

@@ -58,9 +60,9 @@ throw (Error('some unexpected error'));

job.progress(42);
// call done when finished
done();
// or give a error if error
done(Error('error transcoding'));
// If the job throws an unhandled exception it is also handled correctly

@@ -73,9 +75,9 @@ throw (Error('some unexpected error'));

job.progress(42);
// call done when finished
done();
// or give a error if error
done(Error('error transcoding'));
// If the job throws an unhandled exception it is also handled correctly

@@ -89,3 +91,3 @@ throw (Error('some unexpected error'));

```
A queue can be paused and resumed:

@@ -130,8 +132,8 @@ ```javascript

```
Queues are robust and can be run in parallel in several threads or processes
without any risk of hazards or queue corruption. Check this simple example
without any risk of hazards or queue corruption. Check this simple example
using cluster to parallelize jobs accross processes:
```javascript
var
var
Queue = require('bull'),

@@ -147,3 +149,3 @@ cluster = require('cluster');

}
cluster.on('online', function(worker) {

@@ -155,3 +157,3 @@ // Lets create a few jobs for the queue workers

});
cluster.on('exit', function(worker, code, signal) {

@@ -174,3 +176,3 @@ console.log('worker ' + worker.process.pid + ' died');

Bull can also be used for persistent messsage queues. This is a quite useful
feature in some usecases. For example, you can have two servers that need to
feature in some usecases. For example, you can have two servers that need to
communicate with each other. By using a queue the servers do not need to be online

@@ -184,3 +186,3 @@ at the same time, this create a very robust communication channel:

// receiving.
var sendQueue = Queue("server one message queue", 6379, '127.0.0.1');

@@ -201,8 +203,8 @@ var receiveQueue = Queue("server two message queue", 6379, '127.0.0.1');

A common pattern is where you have a cluster of queue processors that just
A common pattern is where you have a cluster of queue processors that just
process jobs as fast as they can, and some other services that need to take the
result of this processors and do something with it, maybe storing results in a
database.
database.
The most robust and scalable way to accomplish this is by combining the standard
The most robust and scalable way to accomplish this is by combining the standard
job queue with the message queue pattern: a service sends jobs to the cluster

@@ -220,8 +222,8 @@ just by opening a job queue and adding jobs to it, the cluster will start

* [Queue##add](#add)
* [Queue##add](#count)
* [Queue##add](#empty)
* [Queue##count](#count)
* [Queue##empty](#empty)
* [Job](#job)
* [Job##remove](#remove)
## Reference

@@ -232,8 +234,8 @@

This is the Queue constructor. It creates a new Queue that is persisted in
Redis. Everytime the same queue is instantiated it tries to process all the
This is the Queue constructor. It creates a new Queue that is persisted in
Redis. Everytime the same queue is instantiated it tries to process all the
old jobs that may exist from a previous unfinished session.
__Arguments__
```javascript

@@ -248,3 +250,3 @@ queueName {String} A unique name for this Queue.

<a name="process"/>

@@ -259,5 +261,5 @@ #### Queue##process(function(job, done))

to signal that the job did not complete successfully.
__Arguments__
```javascript

@@ -269,3 +271,3 @@ jobName {String} A job type name.

---------------------------------------
<a name="add"/>

@@ -275,7 +277,7 @@ #### Queue##add(data, opts)

Creates a new job and adds it to the queue. If the queue is empty the job
will be executed directly, otherwise it will be placed in the queue and
will be executed directly, otherwise it will be placed in the queue and
executed as soon as possible.
__Arguments__
```javascript

@@ -286,2 +288,4 @@ data {PlainObject} A plain object with arguments that will be passed

to the job processing function in job.opts
opts.lifo {Boolean} A boolean which, if true, adds the job to the right
of the queue instead of the left (default false)
returns {Promise} A promise that resolves when the job has been succesfully

@@ -302,3 +306,3 @@ added to the queue (or rejects if some error occured).

__Arguments__
```javascript

@@ -316,3 +320,3 @@ returns {Promise} A promise that resolves with the current jobs count.

__Arguments__
```javascript

@@ -329,5 +333,5 @@ returns {Promise} A promise that resolves with the queue is emptied.

method needed to update its progress.
The most important property for the user is Job##data that includes the
object that was passed to Queue##add, and that is normally used to
object that was passed to Queue##add, and that is normally used to
perform the job.

@@ -343,3 +347,3 @@

__Arguments__
```javascript

@@ -352,3 +356,3 @@ returns {Promise} A promise that resolves when the job is removed.

##License
##License

@@ -355,0 +359,0 @@ (The MIT License)

var Job = require('../lib/job');
var Queue = require('../');
var expect = require('expect.js');
var bluebird = require('bluebird');
var Promise = require('bluebird');

@@ -10,3 +10,3 @@ var STD_QUEUE_NAME = 'test queue';

var queue;
beforeEach(function(done){

@@ -16,3 +16,3 @@ queue = Queue(STD_QUEUE_NAME, 6379, '127.0.0.1');

});
afterEach(function(done){

@@ -24,40 +24,40 @@ queue.empty().then(function(){

})
it('create a queue with standard redis opts', function(done){
var queue = Queue('standard');
queue.once('ready', function(){
expect(queue.client.host).to.be('127.0.0.1');
expect(queue.bclient.host).to.be('127.0.0.1');
expect(queue.client.port).to.be(6379);
expect(queue.bclient.port).to.be(6379);
expect(queue.client.selected_db).to.be(0);
expect(queue.bclient.selected_db).to.be(0);
done();
});
});
it('create a queue using custom redis paramters', function(done){
var queue = Queue('custom', {redis: {DB: 1}});
queue.once('ready', function(){
expect(queue.client.host).to.be('127.0.0.1');
expect(queue.bclient.host).to.be('127.0.0.1');
expect(queue.client.port).to.be(6379);
expect(queue.bclient.port).to.be(6379);
expect(queue.client.selected_db).to.be(1);
expect(queue.bclient.selected_db).to.be(1);
done();
});
});
})
it('create a queue with dots in its name', function(done){
var queue = Queue('using. dots. in.name.');
queue.process(function(job, jobDone){

@@ -68,3 +68,3 @@ expect(job.data.foo).to.be.equal('bar')

})
queue.add({foo: 'bar'}).then(function(job){

@@ -77,3 +77,21 @@ expect(job.jobId).to.be.ok()

});
it('should recover from a connection loss', function(done){
queue = Queue('test connection loss');
queue.on('error', function(err){
// error event has to be observed or the exception will bubble up
}).process(function(job, jobDone){
expect(job.data.foo).to.be.equal('bar');
jobDone();
done();
});
// Simulate disconnect
queue.bclient.stream.end();
queue.bclient.emit('error', new Error('ECONNRESET'));
// add something to the queue
queue.add({'foo': 'bar'});
});
it('process a job', function(done){

@@ -85,3 +103,3 @@ queue.process(function(job, jobDone){

})
queue.add({foo: 'bar'}).then(function(job){

@@ -94,3 +112,3 @@ expect(job.jobId).to.be.ok()

});
it('process a job that updates progress', function(done){

@@ -102,3 +120,3 @@ queue.process(function(job, jobDone){

});
queue.add({foo: 'bar'}).then(function(job){

@@ -110,3 +128,3 @@ expect(job.jobId).to.be.ok()

});
queue.on('progress', function(job, progress){

@@ -118,3 +136,3 @@ expect(job).to.be.ok();

});
it('process a job that returns data in the process handler', function(done){

@@ -125,3 +143,3 @@ queue.process(function(job, jobDone){

});
queue.add({foo: 'bar'}).then(function(job){

@@ -133,3 +151,3 @@ expect(job.jobId).to.be.ok()

});
queue.on('completed', function(job, data){

@@ -146,3 +164,3 @@ expect(job).to.be.ok();

var jobs = [
queueStalled.add({bar: 'baz'}),
queueStalled.add({bar: 'baz'}),
queueStalled.add({bar1: 'baz1'}),

@@ -152,7 +170,7 @@ queueStalled.add({bar2: 'baz2'}),

bluebird.all(jobs).then(function(){
Promise.all(jobs).then(function(){
queueStalled.process(function(job){
// instead of completing we just close the queue to simulate a crash.
queueStalled.close();
setTimeout(function(){

@@ -186,3 +204,3 @@ var queue2 = Queue('test queue stalled', 6379, '127.0.0.1');

queue.LOCK_RENEW_TIME = 10;
for(var j=0; j<NUM_JOBS_PER_QUEUE; j++){

@@ -193,3 +211,3 @@ jobs.push(queue.add({job: j}));

bluebird.all(jobs).then(function(){
Promise.all(jobs).then(function(){
var processed = 0;

@@ -200,3 +218,3 @@ for(var k=0; k<stalledQueues.length; k++){

this.close();
processed ++;

@@ -223,3 +241,3 @@ if(processed === stalledQueues.length){

});
it('does not process a job that is being processed when a new queue starts', function(done){

@@ -230,3 +248,3 @@ var jobId;

});
queue.process(function(job, jobDone){

@@ -243,3 +261,3 @@ expect(job.data.foo).to.be.equal('bar')

});
queue.on('completed', function(job){

@@ -249,3 +267,3 @@ anotherQueue.close();

});
var anotherQueue = Queue(STD_QUEUE_NAME, 6379, '127.0.0.1');

@@ -262,3 +280,3 @@

});
it.skip('process stalled jobs without requiring a queue restart');

@@ -272,3 +290,3 @@

})
queue.add({foo: 'bar'}).then(function(job){

@@ -280,3 +298,3 @@ expect(job.jobId).to.be.ok()

});
queue.once('failed', function(job, err){

@@ -289,3 +307,3 @@ expect(job.jobId).to.be.ok()

});
it('process a job that throws an exception', function(done){

@@ -297,3 +315,3 @@ var jobError = new Error("Job Failed");

});
queue.add({foo: 'bar'}).then(function(job){

@@ -305,3 +323,3 @@ expect(job.jobId).to.be.ok()

});
queue.once('failed', function(job, err){

@@ -314,3 +332,3 @@ expect(job.jobId).to.be.ok()

});
it.skip('retry a job that fails', function(done){

@@ -322,3 +340,3 @@ var jobError = new Error("Job Failed");

})
queue.add({foo: 'bar'}).then(function(job){

@@ -330,3 +348,3 @@ expect(job.jobId).to.be.ok()

});
queue.once('failed', function(job, err){

@@ -339,7 +357,7 @@ expect(job.jobId).to.be.ok()

});
it('process several jobs serially', function(done){
var counter = 1;
var maxJobs = 100;
queue.process(function(job, jobDone){

@@ -352,3 +370,3 @@ expect(job.data.num).to.be.equal(counter);

});
for(var i=1; i<=maxJobs; i++){

@@ -358,3 +376,3 @@ queue.add({foo: 'bar', num: i});

});
it('count added, unprocessed jobs', function(done){

@@ -364,3 +382,3 @@ var counter = 1;

var added = [];
for(var i=1; i<=maxJobs; i++){

@@ -370,6 +388,6 @@ added.push(queue.add({foo: 'bar', num: i}));

bluebird.all(added).then(function(){
Promise.all(added).then(function(){
queue.count().then(function(count){
expect(count).to.be(100);
queue.empty().then(function(){

@@ -384,6 +402,6 @@ queue.count().then(function(count){

});
it('add jobs to a paused queue', function(done){
var ispaused = false, counter = 2;
queue.process(function(job, jobDone){

@@ -396,10 +414,10 @@ expect(ispaused).to.be(false);

});
queue.pause();
ispaused = true;
queue.add({foo: 'paused'});
queue.add({foo: 'paused'});
setTimeout(function(){

@@ -411,6 +429,6 @@ ispaused = false;

});
it('paused a running queue', function(done){
var ispaused = false, isresumed = true, first = true;
queue.process(function(job, jobDone){

@@ -420,3 +438,3 @@ expect(ispaused).to.be(false);

jobDone();
if(first){

@@ -429,8 +447,8 @@ first = false;

done();
}
}
});
queue.add({foo: 'paused'});
queue.add({foo: 'paused'});
queue.on('paused', function(){

@@ -442,9 +460,117 @@ setTimeout(function(){

});
queue.on('resumed', function(){
isresumed = true;
});
});
it('process a lifo queue', function(done){
var currentValue = 0, first = true;
queue = Queue('test lifo');
queue.once('ready', function(){
queue.process(function(job, jobDone){
// Catching the job before the pause
if(first){
expect(job.data.count).to.be.equal(0);
first = false;
return jobDone();
}
expect(job.data.count).to.be.equal(currentValue--);
jobDone();
if(currentValue === 0){
done();
}
});
// Add a job to pend proccessing
queue.add({'count': 0}).then(function(){
queue.pause().then(function(){
// Add a series of jobs in a predictable order
fn = function(cb){
queue.add({'count': ++currentValue}, {'lifo': true}).then(cb);
};
fn(fn(fn(fn(function(){
queue.resume();
}))));
});
});
});
});
describe("Jobs getters", function(){
it('should get waitting jobs', function(done){
Promise.join(queue.add({foo: 'bar'}), queue.add({baz: 'qux'})).then(function(){
queue.getWaiting().then(function(jobs){
expect(jobs).to.be.a('array');
expect(jobs.length).to.be.equal(2);
expect(jobs[1].data.foo).to.be.equal('bar');
expect(jobs[0].data.baz).to.be.equal('qux');
done();
})
});
});
it('should get active jobs', function(done){
var counter = 2;
queue.process(function(job, jobDone){
queue.getActive().then(function(jobs){
expect(jobs).to.be.a('array');
expect(jobs.length).to.be.equal(1);
expect(jobs[0].data.foo).to.be.equal('bar');
done();
});
jobDone();
});
queue.add({foo: 'bar'});
});
it('should get completed jobs', function(){
var counter = 2;
queue.process(function(job, jobDone){
jobDone();
});
queue.on('completed', function(){
counter --;
if(counter === 0){
queue.getCompleted().then(function(jobs){
expect(jobs).to.be.a('array');
// We need a "empty completed" kind of function.
//expect(jobs.length).to.be.equal(2);
done();
});
}
});
queue.add({foo: 'bar'});
queue.add({baz: 'qux'});
});
it('should get failed jobs', function(done){
var counter = 2;
queue.process(function(job, jobDone){
jobDone(Error("Forced error"));
});
queue.on('failed', function(){
counter --;
if(counter === 0){
queue.getFailed().then(function(jobs){
expect(jobs).to.be.a('array');
done();
});
}
});
queue.add({foo: 'bar'});
queue.add({baz: 'qux'});
});
});
});
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc