Comparing version 0.1.0 to 0.1.1
181
lib/queue.js
@@ -7,2 +7,3 @@ "use strict"; | ||
var Job = require('./job'); | ||
var _ = require('lodash'); | ||
@@ -38,2 +39,51 @@ /** | ||
Queue.prototype.process = function(jobName, handler){ | ||
this.handlers[jobName] = handler; | ||
}; | ||
/** | ||
interface JobOptions | ||
{ | ||
priority: Priority; | ||
attempts: number; | ||
} | ||
*/ | ||
/** | ||
@param name: string Name representing this type of job. | ||
@param data: {} Custom data to store for this job. Should be JSON serializable. | ||
@param opts: JobOptions Options for this job. | ||
*/ | ||
Queue.prototype.createJob = function(name, data, opts){ | ||
var deferred = when.defer(); | ||
var _this = this; | ||
// If we fail after incrementing the jobID we may end having an unused | ||
// id, but this should not be so harmful | ||
this.client.INCR(this.toKey('id'), function(err, jobId){ | ||
if(err){ | ||
deferred.reject(); | ||
}else{ | ||
deferred.resolve(jobId); | ||
} | ||
}); | ||
return deferred.promise.then(function(jobId){ | ||
return Job.create(_this, jobId, name, data, opts).then(function(job){ | ||
var deferred = when.defer(); | ||
var key = _this.toKey('wait'); | ||
_this.client.LPUSH(key, jobId, function(err){ | ||
if(err){ | ||
deferred.reject(err); | ||
}else{ | ||
deferred.resolve(job); | ||
} | ||
}); | ||
return deferred.promise; | ||
}); | ||
}); | ||
} | ||
Queue.prototype.run = function(){ | ||
@@ -84,18 +134,19 @@ var _this = this; | ||
if(handler){ | ||
handler(job, function(err){ | ||
var promise; | ||
if(err){ | ||
promise = job.failed(err); | ||
promise.then(function(){ | ||
_this.emit('failed', job, err); | ||
}) | ||
}else{ | ||
promise = job.completed(); | ||
promise.then(function(){ | ||
_this.emit('completed', job); | ||
}); | ||
} | ||
deferred.resolve(promise); | ||
}); | ||
try{ | ||
handler(job, function(err){ | ||
if(err){ | ||
failed(err); | ||
}else{ | ||
completed(); | ||
} | ||
}); | ||
}catch(err){ | ||
failed(err) | ||
} | ||
}else{ | ||
// We just discard the job it if no handler is cd available. | ||
completed(); | ||
} | ||
function completed(){ | ||
var promise = job.completed(); | ||
@@ -107,53 +158,14 @@ promise.then(function(){ | ||
} | ||
function failed(err){ | ||
var promise = job.failed(err); | ||
promise.then(function(){ | ||
_this.emit('failed', job, err); | ||
}) | ||
deferred.resolve(promise); | ||
} | ||
return deferred.promise; | ||
} | ||
Queue.prototype.process = function(jobName, handler){ | ||
this.handlers[jobName] = handler; | ||
}; | ||
/** | ||
interface JobOptions | ||
{ | ||
priority: Priority; | ||
attempts: number; | ||
} | ||
*/ | ||
/** | ||
@param name: string Name representing this type of job. | ||
@param data: {} Custom data to store for this job. Should be JSON serializable. | ||
@param opts: JobOptions Options for this job. | ||
*/ | ||
Queue.prototype.createJob = function(name, data, opts){ | ||
var deferred = when.defer(); | ||
var _this = this; | ||
// If we fail after incrementing the jobID we may end having an unused | ||
// id, but this should not be so harmful | ||
this.client.INCR(this.toKey('id'), function(err, jobId){ | ||
if(err){ | ||
deferred.reject(); | ||
}else{ | ||
deferred.resolve(jobId); | ||
} | ||
}); | ||
return deferred.promise.then(function(jobId){ | ||
return Job.create(_this, jobId, name, data, opts).then(function(job){ | ||
var deferred = when.defer(); | ||
var key = _this.toKey('wait'); | ||
_this.client.LPUSH(key, jobId, function(err){ | ||
if(err){ | ||
deferred.reject(err); | ||
}else{ | ||
deferred.resolve(job); | ||
} | ||
}); | ||
return deferred.promise; | ||
}); | ||
}); | ||
} | ||
// | ||
Queue.prototype.getNextJob = function(){ | ||
@@ -178,2 +190,45 @@ var _this = this; | ||
Queue.prototype.getWaiting = function(start, end){ | ||
return this.getJobs('wait'); | ||
} | ||
Queue.prototype.getActive = function(start, end){ | ||
return this.getJobs('active'); | ||
} | ||
Queue.prototype.getCompleted = function(start, end){ | ||
return this.getJobs('completed'); | ||
} | ||
Queue.prototype.getFailed = function(start, end){ | ||
return this.getJobs('failed'); | ||
} | ||
Queue.prototype.getJobs = function(queueType, start, end){ | ||
var defer = when.defer(); | ||
var _this = this; | ||
start = _.isUndefined(start) ? 0 : start; | ||
end = _.isUndefined(end) ? -1 : end; | ||
var key = this.toKey(queueType); | ||
//this.client.lrange(key, start, end, function(err, jobIds){ | ||
this.client.smembers(key, function(err, jobIds){ | ||
if(err){ | ||
defer.reject(err); | ||
}else{ | ||
if(jobIds.length){ | ||
defer.resolve( | ||
when.all(_.map(jobIds, function(jobId){ | ||
return Job.fromId(_this, jobId); | ||
})) | ||
); | ||
}else{ | ||
defer.resolve([]); | ||
} | ||
} | ||
}); | ||
return defer.promise; | ||
} | ||
Queue.prototype.toKey = function(queueType){ | ||
@@ -180,0 +235,0 @@ return 'bull:' + this.name + ':' + queueType; |
{ | ||
"name": "bull", | ||
"version": "0.1.0", | ||
"version": "0.1.1", | ||
"description": "Job manager", | ||
@@ -22,3 +22,4 @@ "main": "index.js", | ||
"redis": "~0.8.4", | ||
"when": "~2.1.1" | ||
"when": "~2.1.1", | ||
"lodash": "~2.2.1" | ||
}, | ||
@@ -25,0 +26,0 @@ "devDependencies": { |
@@ -6,3 +6,3 @@ Bull Job Manager | ||
A minimalistic, robust and fast job processing queue. | ||
A lightweight, robust and fast job processing queue. | ||
Designed with stability and atomicity in mind. The API is inspired by Kue. | ||
@@ -13,3 +13,3 @@ | ||
If you need more features than the one provided by Bull check | ||
If you need more features than the ones provided by Bull check | ||
[Kue](https://github.com/learnboost/kue) but keep in mind this open | ||
@@ -33,2 +33,5 @@ [issue](https://github.com/LearnBoost/kue/issues/130). | ||
queue.process('video transcode', function(job, done){ | ||
// job.data contains the custom data passed when the job was created | ||
// transcode video asynchronously and report progress | ||
@@ -42,2 +45,5 @@ job.progress(42); | ||
done(Error('error transcoding')); | ||
// If the job throws an unhandled exception it is also handled correctly | ||
throw (Error('some unexpected error')); | ||
}); | ||
@@ -54,2 +60,5 @@ | ||
done(Error('error transcoding')); | ||
// If the job throws an unhandled exception it is also handled correctly | ||
throw (Error('some unexpected error')); | ||
}); | ||
@@ -66,2 +75,5 @@ | ||
done(Error('error transcoding')); | ||
// If the job throws an unhandled exception it is also handled correctly | ||
throw (Error('some unexpected error')); | ||
}); | ||
@@ -85,4 +97,39 @@ | ||
Queues are cheap, so if you need many of them just create new ones with different | ||
names: | ||
var userJohn = new Queue('john'); | ||
var userLisa = new Queue('lisa'); | ||
. | ||
. | ||
. | ||
Queues are robust and can be run in parallel in several threads or processes | ||
without any risk of hazzards or queue corruption. Check this simple example | ||
using cluster to parallelize jobs accross processes: | ||
var | ||
Queue = require('bull'), | ||
cluster = require('cluster'); | ||
var numWorkers = 8; | ||
var queue = new Queue("test concurrent queue", 6379, '127.0.0.1'); | ||
queue.process('test concurrent job', function(job, jobDone){ | ||
if(cluster.isMaster){ | ||
console.log("Job done in master", job.jobId); | ||
}else{ | ||
console.log("Job done by worker", cluster.worker.id, job.jobId); | ||
} | ||
jobDone(); | ||
}); | ||
if(cluster.isMaster){ | ||
for (var i = 0; i < numWorkers; i++) { | ||
cluster.fork(); | ||
} | ||
} | ||
##Documentation | ||
@@ -89,0 +136,0 @@ |
@@ -72,3 +72,3 @@ var Job = require('../lib/job'); | ||
it('completed', function(done){ | ||
Job.create(queue, 3, 'test job progress', {foo: 'bar'}).then(function(job){ | ||
Job.create(queue, 3, 'test job completed', {foo: 'bar'}).then(function(job){ | ||
return job.isCompleted().then(function(isCompleted){ | ||
@@ -90,3 +90,3 @@ expect(isCompleted).to.be(false); | ||
it('failed', function(done){ | ||
Job.create(queue, 4, 'test job progress', {foo: 'bar'}).then(function(job){ | ||
Job.create(queue, 4, 'test job failed', {foo: 'bar'}).then(function(job){ | ||
return job.isFailed().then(function(isFailed){ | ||
@@ -93,0 +93,0 @@ expect(isFailed).to.be(false); |
@@ -73,3 +73,3 @@ var Job = require('../lib/job'); | ||
queue.on('failed', function(job, err){ | ||
queue.once('failed', function(job, err){ | ||
expect(job.jobId).to.be.ok() | ||
@@ -82,2 +82,46 @@ expect(job.name).to.be('test job fails') | ||
it('process a job that throws an exception', function(done){ | ||
var jobError = new Error("Job Failed"); | ||
queue.process('test job throws exception', function(job, jobDone){ | ||
expect(job.data.foo).to.be.equal('bar') | ||
throw jobError; | ||
}); | ||
queue.createJob('test job throws exception', {foo: 'bar'}).then(function(job){ | ||
expect(job.jobId).to.be.ok() | ||
expect(job.name).to.be('test job throws exception') | ||
}).otherwise(function(err){ | ||
done(err); | ||
}); | ||
queue.once('failed', function(job, err){ | ||
expect(job.jobId).to.be.ok() | ||
expect(job.name).to.be('test job throws exception') | ||
expect(err).to.be.eql(jobError); | ||
done(); | ||
}); | ||
}); | ||
it.skip('retry a job that fails', function(done){ | ||
var jobError = new Error("Job Failed"); | ||
queue.process('test job fails retry', function(job, jobDone){ | ||
expect(job.data.foo).to.be.equal('bar') | ||
jobDone(jobError); | ||
}) | ||
queue.createJob('test job fails retry', {foo: 'bar'}).then(function(job){ | ||
expect(job.jobId).to.be.ok() | ||
expect(job.name).to.be('test job fails retry') | ||
}).otherwise(function(err){ | ||
done(err); | ||
}); | ||
queue.once('failed', function(job, err){ | ||
expect(job.jobId).to.be.ok() | ||
expect(job.name).to.be('test job fails retry') | ||
expect(err).to.be.eql(jobError); | ||
done(); | ||
}); | ||
}); | ||
it('process several jobs serially', function(done){ | ||
@@ -98,3 +142,3 @@ var counter = 1; | ||
}); | ||
}); |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
24342
11
565
233
3
+ Addedlodash@~2.2.1
+ Addedlodash@2.2.1(transitive)