Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

bull

Package Overview
Dependencies
Maintainers
1
Versions
197
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

bull - npm Package Compare versions

Comparing version 0.1.0 to 0.1.1

lib/cluster-queue.js

181

lib/queue.js

@@ -7,2 +7,3 @@ "use strict";

var Job = require('./job');
var _ = require('lodash');

@@ -38,2 +39,51 @@ /**

Queue.prototype.process = function(jobName, handler){
this.handlers[jobName] = handler;
};
/**
interface JobOptions
{
priority: Priority;
attempts: number;
}
*/
/**
@param name: string Name representing this type of job.
@param data: {} Custom data to store for this job. Should be JSON serializable.
@param opts: JobOptions Options for this job.
*/
Queue.prototype.createJob = function(name, data, opts){
var deferred = when.defer();
var _this = this;
// If we fail after incrementing the jobID we may end having an unused
// id, but this should not be so harmful
this.client.INCR(this.toKey('id'), function(err, jobId){
if(err){
deferred.reject();
}else{
deferred.resolve(jobId);
}
});
return deferred.promise.then(function(jobId){
return Job.create(_this, jobId, name, data, opts).then(function(job){
var deferred = when.defer();
var key = _this.toKey('wait');
_this.client.LPUSH(key, jobId, function(err){
if(err){
deferred.reject(err);
}else{
deferred.resolve(job);
}
});
return deferred.promise;
});
});
}
Queue.prototype.run = function(){

@@ -84,18 +134,19 @@ var _this = this;

if(handler){
handler(job, function(err){
var promise;
if(err){
promise = job.failed(err);
promise.then(function(){
_this.emit('failed', job, err);
})
}else{
promise = job.completed();
promise.then(function(){
_this.emit('completed', job);
});
}
deferred.resolve(promise);
});
try{
handler(job, function(err){
if(err){
failed(err);
}else{
completed();
}
});
}catch(err){
failed(err)
}
}else{
// We just discard the job it if no handler is cd available.
completed();
}
function completed(){
var promise = job.completed();

@@ -107,53 +158,14 @@ promise.then(function(){

}
function failed(err){
var promise = job.failed(err);
promise.then(function(){
_this.emit('failed', job, err);
})
deferred.resolve(promise);
}
return deferred.promise;
}
Queue.prototype.process = function(jobName, handler){
this.handlers[jobName] = handler;
};
/**
interface JobOptions
{
priority: Priority;
attempts: number;
}
*/
/**
@param name: string Name representing this type of job.
@param data: {} Custom data to store for this job. Should be JSON serializable.
@param opts: JobOptions Options for this job.
*/
Queue.prototype.createJob = function(name, data, opts){
var deferred = when.defer();
var _this = this;
// If we fail after incrementing the jobID we may end having an unused
// id, but this should not be so harmful
this.client.INCR(this.toKey('id'), function(err, jobId){
if(err){
deferred.reject();
}else{
deferred.resolve(jobId);
}
});
return deferred.promise.then(function(jobId){
return Job.create(_this, jobId, name, data, opts).then(function(job){
var deferred = when.defer();
var key = _this.toKey('wait');
_this.client.LPUSH(key, jobId, function(err){
if(err){
deferred.reject(err);
}else{
deferred.resolve(job);
}
});
return deferred.promise;
});
});
}
//
Queue.prototype.getNextJob = function(){

@@ -178,2 +190,45 @@ var _this = this;

Queue.prototype.getWaiting = function(start, end){
return this.getJobs('wait');
}
Queue.prototype.getActive = function(start, end){
return this.getJobs('active');
}
Queue.prototype.getCompleted = function(start, end){
return this.getJobs('completed');
}
Queue.prototype.getFailed = function(start, end){
return this.getJobs('failed');
}
Queue.prototype.getJobs = function(queueType, start, end){
var defer = when.defer();
var _this = this;
start = _.isUndefined(start) ? 0 : start;
end = _.isUndefined(end) ? -1 : end;
var key = this.toKey(queueType);
//this.client.lrange(key, start, end, function(err, jobIds){
this.client.smembers(key, function(err, jobIds){
if(err){
defer.reject(err);
}else{
if(jobIds.length){
defer.resolve(
when.all(_.map(jobIds, function(jobId){
return Job.fromId(_this, jobId);
}))
);
}else{
defer.resolve([]);
}
}
});
return defer.promise;
}
Queue.prototype.toKey = function(queueType){

@@ -180,0 +235,0 @@ return 'bull:' + this.name + ':' + queueType;

{
"name": "bull",
"version": "0.1.0",
"version": "0.1.1",
"description": "Job manager",

@@ -22,3 +22,4 @@ "main": "index.js",

"redis": "~0.8.4",
"when": "~2.1.1"
"when": "~2.1.1",
"lodash": "~2.2.1"
},

@@ -25,0 +26,0 @@ "devDependencies": {

@@ -6,3 +6,3 @@ Bull Job Manager

A minimalistic, robust and fast job processing queue.
A lightweight, robust and fast job processing queue.
Designed with stability and atomicity in mind. The API is inspired by Kue.

@@ -13,3 +13,3 @@

If you need more features than the one provided by Bull check
If you need more features than the ones provided by Bull check
[Kue](https://github.com/learnboost/kue) but keep in mind this open

@@ -33,2 +33,5 @@ [issue](https://github.com/LearnBoost/kue/issues/130).

queue.process('video transcode', function(job, done){
// job.data contains the custom data passed when the job was created
// transcode video asynchronously and report progress

@@ -42,2 +45,5 @@ job.progress(42);

done(Error('error transcoding'));
// If the job throws an unhandled exception it is also handled correctly
throw (Error('some unexpected error'));
});

@@ -54,2 +60,5 @@

done(Error('error transcoding'));
// If the job throws an unhandled exception it is also handled correctly
throw (Error('some unexpected error'));
});

@@ -66,2 +75,5 @@

done(Error('error transcoding'));
// If the job throws an unhandled exception it is also handled correctly
throw (Error('some unexpected error'));
});

@@ -85,4 +97,39 @@

Queues are cheap, so if you need many of them just create new ones with different
names:
var userJohn = new Queue('john');
var userLisa = new Queue('lisa');
.
.
.
Queues are robust and can be run in parallel in several threads or processes
without any risk of hazzards or queue corruption. Check this simple example
using cluster to parallelize jobs accross processes:
var
Queue = require('bull'),
cluster = require('cluster');
var numWorkers = 8;
var queue = new Queue("test concurrent queue", 6379, '127.0.0.1');
queue.process('test concurrent job', function(job, jobDone){
if(cluster.isMaster){
console.log("Job done in master", job.jobId);
}else{
console.log("Job done by worker", cluster.worker.id, job.jobId);
}
jobDone();
});
if(cluster.isMaster){
for (var i = 0; i < numWorkers; i++) {
cluster.fork();
}
}
##Documentation

@@ -89,0 +136,0 @@

@@ -72,3 +72,3 @@ var Job = require('../lib/job');

it('completed', function(done){
Job.create(queue, 3, 'test job progress', {foo: 'bar'}).then(function(job){
Job.create(queue, 3, 'test job completed', {foo: 'bar'}).then(function(job){
return job.isCompleted().then(function(isCompleted){

@@ -90,3 +90,3 @@ expect(isCompleted).to.be(false);

it('failed', function(done){
Job.create(queue, 4, 'test job progress', {foo: 'bar'}).then(function(job){
Job.create(queue, 4, 'test job failed', {foo: 'bar'}).then(function(job){
return job.isFailed().then(function(isFailed){

@@ -93,0 +93,0 @@ expect(isFailed).to.be(false);

@@ -73,3 +73,3 @@ var Job = require('../lib/job');

queue.on('failed', function(job, err){
queue.once('failed', function(job, err){
expect(job.jobId).to.be.ok()

@@ -82,2 +82,46 @@ expect(job.name).to.be('test job fails')

it('process a job that throws an exception', function(done){
var jobError = new Error("Job Failed");
queue.process('test job throws exception', function(job, jobDone){
expect(job.data.foo).to.be.equal('bar')
throw jobError;
});
queue.createJob('test job throws exception', {foo: 'bar'}).then(function(job){
expect(job.jobId).to.be.ok()
expect(job.name).to.be('test job throws exception')
}).otherwise(function(err){
done(err);
});
queue.once('failed', function(job, err){
expect(job.jobId).to.be.ok()
expect(job.name).to.be('test job throws exception')
expect(err).to.be.eql(jobError);
done();
});
});
it.skip('retry a job that fails', function(done){
var jobError = new Error("Job Failed");
queue.process('test job fails retry', function(job, jobDone){
expect(job.data.foo).to.be.equal('bar')
jobDone(jobError);
})
queue.createJob('test job fails retry', {foo: 'bar'}).then(function(job){
expect(job.jobId).to.be.ok()
expect(job.name).to.be('test job fails retry')
}).otherwise(function(err){
done(err);
});
queue.once('failed', function(job, err){
expect(job.jobId).to.be.ok()
expect(job.name).to.be('test job fails retry')
expect(err).to.be.eql(jobError);
done();
});
});
it('process several jobs serially', function(done){

@@ -98,3 +142,3 @@ var counter = 1;

});
});
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc