Comparing version 0.1.5 to 0.1.6
107
lib/queue.js
@@ -13,3 +13,3 @@ "use strict"; | ||
Gets or creates a new Queue with the given name. | ||
The Queue keeps 4 data structures: | ||
@@ -22,3 +22,3 @@ - wait (list) | ||
/ | ||
job -> wait -> active | ||
job -> wait -> active | ||
\ | ||
@@ -34,3 +34,3 @@ - > failed | ||
} | ||
var redisDB = 0; | ||
@@ -43,18 +43,36 @@ if(_.isObject(redisPort)){ | ||
redisOptions = redisOpts.opts || {}; | ||
redisDB = redisOpts.DB; | ||
redisDB = redisOpts.DB; | ||
} | ||
this.name = name; | ||
this.client = redis.createClient(redisPort, redisHost, redisOptions); | ||
this.bclient = redis.createClient(redisPort, redisHost, redisOptions); | ||
this.paused = false; | ||
this.token = uuid(); | ||
this.LOCK_RENEW_TIME = LOCK_RENEW_TIME; | ||
var _this = this; | ||
// bubble up Redis error events and attempt to restart queue on | ||
// error recovery. | ||
var redisErrorOccurred = false; | ||
this.client.on('error', function(err){ | ||
_this.emit('error', err); | ||
}); | ||
this.bclient.on('error', function(err){ | ||
_this.emit('error', err); | ||
redisErrorOccurred = true; | ||
}); | ||
this.bclient.on('ready', function(){ | ||
if(redisErrorOccurred){ | ||
redisErrorOccurred = false; | ||
_this.run(); | ||
} | ||
}); | ||
// Promisify some redis client methods | ||
var _this = this; | ||
var methods = [ | ||
'lrange', | ||
'lrange', | ||
'sismember', | ||
@@ -65,2 +83,3 @@ 'set', | ||
'lpush', | ||
'rpush', | ||
'hset', | ||
@@ -75,5 +94,5 @@ 'hmset', | ||
}); | ||
this.bclient.brpoplpushAsync = Promise.promisify(this.bclient.BRPOPLPUSH); | ||
this.client.select(redisDB, function(err){ | ||
@@ -96,3 +115,3 @@ _this.bclient.select(redisDB, function(err){ | ||
is dequeued. | ||
@method process | ||
@@ -106,3 +125,3 @@ */ | ||
}); | ||
this.handler = handler; | ||
@@ -126,3 +145,4 @@ }; | ||
var _this = this; | ||
opts = opts || {}; | ||
// If we fail after incrementing the job id we may end having an unused | ||
@@ -133,3 +153,4 @@ // id, but this should not be so harmful | ||
var key = _this.toKey('wait'); | ||
return _this.client.lpushAsync(key, jobId).then(function(){ | ||
// if queue is LIFO use rpushAsync | ||
return _this.client[(opts.lifo ? 'r' : 'l') + 'pushAsync'](key, jobId).then(function(){ | ||
return job; | ||
@@ -157,3 +178,3 @@ }); | ||
Empties the queue. | ||
Returns a promise that is resolved after the operation has been completed. | ||
@@ -164,3 +185,3 @@ Note that if some other process is adding jobs at the same time as emptying, | ||
jobs, there will be zombie jobs left in redis. | ||
TODO: Use EVAL to make this operation fully atomic. | ||
@@ -170,6 +191,6 @@ */ | ||
var _this = this; | ||
// Get all jobids and empty all lists atomically. | ||
var multi = this.multi(); | ||
multi.lrange(this.toKey('wait'), 0, -1); | ||
@@ -179,3 +200,3 @@ multi.lrange(this.toKey('paused'), 0, -1); | ||
multi.del(this.toKey('paused')); | ||
return multi.execAsync().then(function(res){ | ||
@@ -188,3 +209,3 @@ var waiting = res[0]; | ||
}); | ||
jobKeys = jobKeys.concat(_.map(paused, function(jobId){ | ||
@@ -196,6 +217,6 @@ return _this.toKey(jobId); | ||
var multi = _this.multi(); | ||
multi.del.apply(multi, jobKeys); | ||
return multi.execAsync(); | ||
} | ||
} | ||
}); | ||
@@ -207,3 +228,3 @@ } | ||
TODO: This pause only pauses the current queue instance, it is not | ||
good enough, we need to pause all instances. It should be great if RENAME can | ||
good enough, we need to pause all instances. It should be great if RENAME can | ||
be used for this. So when pausing we just rename the wait queue to paused. | ||
@@ -218,3 +239,3 @@ BRPOPLPUSH still blocks even when a key does not exist, so it will block | ||
if(this.paused) return this.paused; | ||
var _this = this; | ||
@@ -233,3 +254,3 @@ | ||
}); | ||
return this.paused; | ||
@@ -294,3 +315,3 @@ } | ||
var _this = this; | ||
return this.getNextJob().then(function(job){ | ||
@@ -376,3 +397,3 @@ return _this.processJob(job); | ||
Atomically moves a job from one list to another. | ||
@method moveJob | ||
@@ -385,27 +406,35 @@ */ | ||
Queue.prototype.getWaiting = function(start, end){ | ||
return this.getJobs('wait'); | ||
return this.getJobs('wait', true); | ||
} | ||
Queue.prototype.getActive = function(start, end){ | ||
return this.getJobs('active'); | ||
return this.getJobs('active', true); | ||
} | ||
Queue.prototype.getCompleted = function(start, end){ | ||
Queue.prototype.getCompleted = function(){ | ||
return this.getJobs('completed'); | ||
} | ||
Queue.prototype.getFailed = function(start, end){ | ||
Queue.prototype.getFailed = function(){ | ||
return this.getJobs('failed'); | ||
} | ||
Queue.prototype.getJobs = function(queueType, start, end){ | ||
Queue.prototype.getJobs = function(queueType, isList, start, end){ | ||
var _this = this; | ||
start = _.isUndefined(start) ? 0 : start; | ||
end = _.isUndefined(end) ? -1 : end; | ||
var key = this.toKey(queueType); | ||
var jobs; | ||
if(isList){ | ||
start = _.isUndefined(start) ? 0 : start; | ||
end = _.isUndefined(end) ? -1 : end; | ||
jobs = this.client.lrangeAsync(key, start, end); | ||
}else{ | ||
jobs = this.client.smembersAsync(key); | ||
} | ||
var key = this.toKey(queueType); | ||
//this.client.lrange(key, start, end, function(err, jobIds){ | ||
return this.client.smembersAsync(key).then(function(jobIds){ | ||
return jobs.then(function(jobIds){ | ||
if(jobIds.length){ | ||
@@ -412,0 +441,0 @@ return Promise.all(_.map(jobIds, function(jobId){ |
{ | ||
"name": "bull", | ||
"version": "0.1.5", | ||
"version": "0.1.6", | ||
"description": "Job manager", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
@@ -6,9 +6,9 @@ Bull Job Manager | ||
A lightweight, robust and fast job processing queue. | ||
A lightweight, robust and fast job processing queue. | ||
Designed with stability and atomicity in mind. The API is inspired by Kue. | ||
It uses redis for persistence, so the queue is not lost if the server goes | ||
It uses redis for persistence, so the queue is not lost if the server goes | ||
down for any reason. | ||
If you need more features than the ones provided by Bull check | ||
If you need more features than the ones provided by Bull check | ||
[Kue](https://github.com/learnboost/kue) but keep in mind this open | ||
@@ -26,2 +26,4 @@ [issue](https://github.com/LearnBoost/kue/issues/130). | ||
Note that you need a redis version higher or equal than 2.6.12 for bull to work. | ||
Quick Guide | ||
@@ -37,15 +39,15 @@ ----------- | ||
videoQueue.process(function(job, done){ | ||
// job.data contains the custom data passed when the job was created | ||
// job.jobId contains id of this job. | ||
// transcode video asynchronously and report progress | ||
job.progress(42); | ||
// call done when finished | ||
done(); | ||
// or give a error if error | ||
done(Error('error transcoding')); | ||
// If the job throws an unhandled exception it is also handled correctly | ||
@@ -58,9 +60,9 @@ throw (Error('some unexpected error')); | ||
job.progress(42); | ||
// call done when finished | ||
done(); | ||
// or give a error if error | ||
done(Error('error transcoding')); | ||
// If the job throws an unhandled exception it is also handled correctly | ||
@@ -73,9 +75,9 @@ throw (Error('some unexpected error')); | ||
job.progress(42); | ||
// call done when finished | ||
done(); | ||
// or give a error if error | ||
done(Error('error transcoding')); | ||
// If the job throws an unhandled exception it is also handled correctly | ||
@@ -89,3 +91,3 @@ throw (Error('some unexpected error')); | ||
``` | ||
A queue can be paused and resumed: | ||
@@ -130,8 +132,8 @@ ```javascript | ||
``` | ||
Queues are robust and can be run in parallel in several threads or processes | ||
without any risk of hazards or queue corruption. Check this simple example | ||
without any risk of hazards or queue corruption. Check this simple example | ||
using cluster to parallelize jobs accross processes: | ||
```javascript | ||
var | ||
var | ||
Queue = require('bull'), | ||
@@ -147,3 +149,3 @@ cluster = require('cluster'); | ||
} | ||
cluster.on('online', function(worker) { | ||
@@ -155,3 +157,3 @@ // Lets create a few jobs for the queue workers | ||
}); | ||
cluster.on('exit', function(worker, code, signal) { | ||
@@ -174,3 +176,3 @@ console.log('worker ' + worker.process.pid + ' died'); | ||
Bull can also be used for persistent messsage queues. This is a quite useful | ||
feature in some usecases. For example, you can have two servers that need to | ||
feature in some usecases. For example, you can have two servers that need to | ||
communicate with each other. By using a queue the servers do not need to be online | ||
@@ -184,3 +186,3 @@ at the same time, this create a very robust communication channel: | ||
// receiving. | ||
var sendQueue = Queue("server one message queue", 6379, '127.0.0.1'); | ||
@@ -201,8 +203,8 @@ var receiveQueue = Queue("server two message queue", 6379, '127.0.0.1'); | ||
A common pattern is where you have a cluster of queue processors that just | ||
A common pattern is where you have a cluster of queue processors that just | ||
process jobs as fast as they can, and some other services that need to take the | ||
result of this processors and do something with it, maybe storing results in a | ||
database. | ||
database. | ||
The most robust and scalable way to accomplish this is by combining the standard | ||
The most robust and scalable way to accomplish this is by combining the standard | ||
job queue with the message queue pattern: a service sends jobs to the cluster | ||
@@ -220,8 +222,8 @@ just by opening a job queue and adding jobs to it, the cluster will start | ||
* [Queue##add](#add) | ||
* [Queue##add](#count) | ||
* [Queue##add](#empty) | ||
* [Queue##count](#count) | ||
* [Queue##empty](#empty) | ||
* [Job](#job) | ||
* [Job##remove](#remove) | ||
## Reference | ||
@@ -232,8 +234,8 @@ | ||
This is the Queue constructor. It creates a new Queue that is persisted in | ||
Redis. Everytime the same queue is instantiated it tries to process all the | ||
This is the Queue constructor. It creates a new Queue that is persisted in | ||
Redis. Everytime the same queue is instantiated it tries to process all the | ||
old jobs that may exist from a previous unfinished session. | ||
__Arguments__ | ||
```javascript | ||
@@ -248,3 +250,3 @@ queueName {String} A unique name for this Queue. | ||
<a name="process"/> | ||
@@ -259,5 +261,5 @@ #### Queue##process(function(job, done)) | ||
to signal that the job did not complete successfully. | ||
__Arguments__ | ||
```javascript | ||
@@ -269,3 +271,3 @@ jobName {String} A job type name. | ||
--------------------------------------- | ||
<a name="add"/> | ||
@@ -275,7 +277,7 @@ #### Queue##add(data, opts) | ||
Creates a new job and adds it to the queue. If the queue is empty the job | ||
will be executed directly, otherwise it will be placed in the queue and | ||
will be executed directly, otherwise it will be placed in the queue and | ||
executed as soon as possible. | ||
__Arguments__ | ||
```javascript | ||
@@ -286,2 +288,4 @@ data {PlainObject} A plain object with arguments that will be passed | ||
to the job processing function in job.opts | ||
opts.lifo {Boolean} A boolean which, if true, adds the job to the right | ||
of the queue instead of the left (default false) | ||
returns {Promise} A promise that resolves when the job has been succesfully | ||
@@ -302,3 +306,3 @@ added to the queue (or rejects if some error occured). | ||
__Arguments__ | ||
```javascript | ||
@@ -316,3 +320,3 @@ returns {Promise} A promise that resolves with the current jobs count. | ||
__Arguments__ | ||
```javascript | ||
@@ -329,5 +333,5 @@ returns {Promise} A promise that resolves with the queue is emptied. | ||
method needed to update its progress. | ||
The most important property for the user is Job##data that includes the | ||
object that was passed to Queue##add, and that is normally used to | ||
object that was passed to Queue##add, and that is normally used to | ||
perform the job. | ||
@@ -343,3 +347,3 @@ | ||
__Arguments__ | ||
```javascript | ||
@@ -352,3 +356,3 @@ returns {Promise} A promise that resolves when the job is removed. | ||
##License | ||
##License | ||
@@ -355,0 +359,0 @@ (The MIT License) |
var Job = require('../lib/job'); | ||
var Queue = require('../'); | ||
var expect = require('expect.js'); | ||
var bluebird = require('bluebird'); | ||
var Promise = require('bluebird'); | ||
@@ -10,3 +10,3 @@ var STD_QUEUE_NAME = 'test queue'; | ||
var queue; | ||
beforeEach(function(done){ | ||
@@ -16,3 +16,3 @@ queue = Queue(STD_QUEUE_NAME, 6379, '127.0.0.1'); | ||
}); | ||
afterEach(function(done){ | ||
@@ -24,40 +24,40 @@ queue.empty().then(function(){ | ||
}) | ||
it('create a queue with standard redis opts', function(done){ | ||
var queue = Queue('standard'); | ||
queue.once('ready', function(){ | ||
expect(queue.client.host).to.be('127.0.0.1'); | ||
expect(queue.bclient.host).to.be('127.0.0.1'); | ||
expect(queue.client.port).to.be(6379); | ||
expect(queue.bclient.port).to.be(6379); | ||
expect(queue.client.selected_db).to.be(0); | ||
expect(queue.bclient.selected_db).to.be(0); | ||
done(); | ||
}); | ||
}); | ||
it('create a queue using custom redis paramters', function(done){ | ||
var queue = Queue('custom', {redis: {DB: 1}}); | ||
queue.once('ready', function(){ | ||
expect(queue.client.host).to.be('127.0.0.1'); | ||
expect(queue.bclient.host).to.be('127.0.0.1'); | ||
expect(queue.client.port).to.be(6379); | ||
expect(queue.bclient.port).to.be(6379); | ||
expect(queue.client.selected_db).to.be(1); | ||
expect(queue.bclient.selected_db).to.be(1); | ||
done(); | ||
}); | ||
}); | ||
}) | ||
it('create a queue with dots in its name', function(done){ | ||
var queue = Queue('using. dots. in.name.'); | ||
queue.process(function(job, jobDone){ | ||
@@ -68,3 +68,3 @@ expect(job.data.foo).to.be.equal('bar') | ||
}) | ||
queue.add({foo: 'bar'}).then(function(job){ | ||
@@ -77,3 +77,21 @@ expect(job.jobId).to.be.ok() | ||
}); | ||
it('should recover from a connection loss', function(done){ | ||
queue = Queue('test connection loss'); | ||
queue.on('error', function(err){ | ||
// error event has to be observed or the exception will bubble up | ||
}).process(function(job, jobDone){ | ||
expect(job.data.foo).to.be.equal('bar'); | ||
jobDone(); | ||
done(); | ||
}); | ||
// Simulate disconnect | ||
queue.bclient.stream.end(); | ||
queue.bclient.emit('error', new Error('ECONNRESET')); | ||
// add something to the queue | ||
queue.add({'foo': 'bar'}); | ||
}); | ||
it('process a job', function(done){ | ||
@@ -85,3 +103,3 @@ queue.process(function(job, jobDone){ | ||
}) | ||
queue.add({foo: 'bar'}).then(function(job){ | ||
@@ -94,3 +112,3 @@ expect(job.jobId).to.be.ok() | ||
}); | ||
it('process a job that updates progress', function(done){ | ||
@@ -102,3 +120,3 @@ queue.process(function(job, jobDone){ | ||
}); | ||
queue.add({foo: 'bar'}).then(function(job){ | ||
@@ -110,3 +128,3 @@ expect(job.jobId).to.be.ok() | ||
}); | ||
queue.on('progress', function(job, progress){ | ||
@@ -118,3 +136,3 @@ expect(job).to.be.ok(); | ||
}); | ||
it('process a job that returns data in the process handler', function(done){ | ||
@@ -125,3 +143,3 @@ queue.process(function(job, jobDone){ | ||
}); | ||
queue.add({foo: 'bar'}).then(function(job){ | ||
@@ -133,3 +151,3 @@ expect(job.jobId).to.be.ok() | ||
}); | ||
queue.on('completed', function(job, data){ | ||
@@ -146,3 +164,3 @@ expect(job).to.be.ok(); | ||
var jobs = [ | ||
queueStalled.add({bar: 'baz'}), | ||
queueStalled.add({bar: 'baz'}), | ||
queueStalled.add({bar1: 'baz1'}), | ||
@@ -152,7 +170,7 @@ queueStalled.add({bar2: 'baz2'}), | ||
bluebird.all(jobs).then(function(){ | ||
Promise.all(jobs).then(function(){ | ||
queueStalled.process(function(job){ | ||
// instead of completing we just close the queue to simulate a crash. | ||
queueStalled.close(); | ||
setTimeout(function(){ | ||
@@ -186,3 +204,3 @@ var queue2 = Queue('test queue stalled', 6379, '127.0.0.1'); | ||
queue.LOCK_RENEW_TIME = 10; | ||
for(var j=0; j<NUM_JOBS_PER_QUEUE; j++){ | ||
@@ -193,3 +211,3 @@ jobs.push(queue.add({job: j})); | ||
bluebird.all(jobs).then(function(){ | ||
Promise.all(jobs).then(function(){ | ||
var processed = 0; | ||
@@ -200,3 +218,3 @@ for(var k=0; k<stalledQueues.length; k++){ | ||
this.close(); | ||
processed ++; | ||
@@ -223,3 +241,3 @@ if(processed === stalledQueues.length){ | ||
}); | ||
it('does not process a job that is being processed when a new queue starts', function(done){ | ||
@@ -230,3 +248,3 @@ var jobId; | ||
}); | ||
queue.process(function(job, jobDone){ | ||
@@ -243,3 +261,3 @@ expect(job.data.foo).to.be.equal('bar') | ||
}); | ||
queue.on('completed', function(job){ | ||
@@ -249,3 +267,3 @@ anotherQueue.close(); | ||
}); | ||
var anotherQueue = Queue(STD_QUEUE_NAME, 6379, '127.0.0.1'); | ||
@@ -262,3 +280,3 @@ | ||
}); | ||
it.skip('process stalled jobs without requiring a queue restart'); | ||
@@ -272,3 +290,3 @@ | ||
}) | ||
queue.add({foo: 'bar'}).then(function(job){ | ||
@@ -280,3 +298,3 @@ expect(job.jobId).to.be.ok() | ||
}); | ||
queue.once('failed', function(job, err){ | ||
@@ -289,3 +307,3 @@ expect(job.jobId).to.be.ok() | ||
}); | ||
it('process a job that throws an exception', function(done){ | ||
@@ -297,3 +315,3 @@ var jobError = new Error("Job Failed"); | ||
}); | ||
queue.add({foo: 'bar'}).then(function(job){ | ||
@@ -305,3 +323,3 @@ expect(job.jobId).to.be.ok() | ||
}); | ||
queue.once('failed', function(job, err){ | ||
@@ -314,3 +332,3 @@ expect(job.jobId).to.be.ok() | ||
}); | ||
it.skip('retry a job that fails', function(done){ | ||
@@ -322,3 +340,3 @@ var jobError = new Error("Job Failed"); | ||
}) | ||
queue.add({foo: 'bar'}).then(function(job){ | ||
@@ -330,3 +348,3 @@ expect(job.jobId).to.be.ok() | ||
}); | ||
queue.once('failed', function(job, err){ | ||
@@ -339,7 +357,7 @@ expect(job.jobId).to.be.ok() | ||
}); | ||
it('process several jobs serially', function(done){ | ||
var counter = 1; | ||
var maxJobs = 100; | ||
queue.process(function(job, jobDone){ | ||
@@ -352,3 +370,3 @@ expect(job.data.num).to.be.equal(counter); | ||
}); | ||
for(var i=1; i<=maxJobs; i++){ | ||
@@ -358,3 +376,3 @@ queue.add({foo: 'bar', num: i}); | ||
}); | ||
it('count added, unprocessed jobs', function(done){ | ||
@@ -364,3 +382,3 @@ var counter = 1; | ||
var added = []; | ||
for(var i=1; i<=maxJobs; i++){ | ||
@@ -370,6 +388,6 @@ added.push(queue.add({foo: 'bar', num: i})); | ||
bluebird.all(added).then(function(){ | ||
Promise.all(added).then(function(){ | ||
queue.count().then(function(count){ | ||
expect(count).to.be(100); | ||
queue.empty().then(function(){ | ||
@@ -384,6 +402,6 @@ queue.count().then(function(count){ | ||
}); | ||
it('add jobs to a paused queue', function(done){ | ||
var ispaused = false, counter = 2; | ||
queue.process(function(job, jobDone){ | ||
@@ -396,10 +414,10 @@ expect(ispaused).to.be(false); | ||
}); | ||
queue.pause(); | ||
ispaused = true; | ||
queue.add({foo: 'paused'}); | ||
queue.add({foo: 'paused'}); | ||
setTimeout(function(){ | ||
@@ -411,6 +429,6 @@ ispaused = false; | ||
}); | ||
it('paused a running queue', function(done){ | ||
var ispaused = false, isresumed = true, first = true; | ||
queue.process(function(job, jobDone){ | ||
@@ -420,3 +438,3 @@ expect(ispaused).to.be(false); | ||
jobDone(); | ||
if(first){ | ||
@@ -429,8 +447,8 @@ first = false; | ||
done(); | ||
} | ||
} | ||
}); | ||
queue.add({foo: 'paused'}); | ||
queue.add({foo: 'paused'}); | ||
queue.on('paused', function(){ | ||
@@ -442,9 +460,117 @@ setTimeout(function(){ | ||
}); | ||
queue.on('resumed', function(){ | ||
isresumed = true; | ||
}); | ||
}); | ||
it('process a lifo queue', function(done){ | ||
var currentValue = 0, first = true; | ||
queue = Queue('test lifo'); | ||
queue.once('ready', function(){ | ||
queue.process(function(job, jobDone){ | ||
// Catching the job before the pause | ||
if(first){ | ||
expect(job.data.count).to.be.equal(0); | ||
first = false; | ||
return jobDone(); | ||
} | ||
expect(job.data.count).to.be.equal(currentValue--); | ||
jobDone(); | ||
if(currentValue === 0){ | ||
done(); | ||
} | ||
}); | ||
// Add a job to pend proccessing | ||
queue.add({'count': 0}).then(function(){ | ||
queue.pause().then(function(){ | ||
// Add a series of jobs in a predictable order | ||
fn = function(cb){ | ||
queue.add({'count': ++currentValue}, {'lifo': true}).then(cb); | ||
}; | ||
fn(fn(fn(fn(function(){ | ||
queue.resume(); | ||
})))); | ||
}); | ||
}); | ||
}); | ||
}); | ||
describe("Jobs getters", function(){ | ||
it('should get waitting jobs', function(done){ | ||
Promise.join(queue.add({foo: 'bar'}), queue.add({baz: 'qux'})).then(function(){ | ||
queue.getWaiting().then(function(jobs){ | ||
expect(jobs).to.be.a('array'); | ||
expect(jobs.length).to.be.equal(2); | ||
expect(jobs[1].data.foo).to.be.equal('bar'); | ||
expect(jobs[0].data.baz).to.be.equal('qux'); | ||
done(); | ||
}) | ||
}); | ||
}); | ||
it('should get active jobs', function(done){ | ||
var counter = 2; | ||
queue.process(function(job, jobDone){ | ||
queue.getActive().then(function(jobs){ | ||
expect(jobs).to.be.a('array'); | ||
expect(jobs.length).to.be.equal(1); | ||
expect(jobs[0].data.foo).to.be.equal('bar'); | ||
done(); | ||
}); | ||
jobDone(); | ||
}); | ||
queue.add({foo: 'bar'}); | ||
}); | ||
it('should get completed jobs', function(){ | ||
var counter = 2; | ||
queue.process(function(job, jobDone){ | ||
jobDone(); | ||
}); | ||
queue.on('completed', function(){ | ||
counter --; | ||
if(counter === 0){ | ||
queue.getCompleted().then(function(jobs){ | ||
expect(jobs).to.be.a('array'); | ||
// We need a "empty completed" kind of function. | ||
//expect(jobs.length).to.be.equal(2); | ||
done(); | ||
}); | ||
} | ||
}); | ||
queue.add({foo: 'bar'}); | ||
queue.add({baz: 'qux'}); | ||
}); | ||
it('should get failed jobs', function(done){ | ||
var counter = 2; | ||
queue.process(function(job, jobDone){ | ||
jobDone(Error("Forced error")); | ||
}); | ||
queue.on('failed', function(){ | ||
counter --; | ||
if(counter === 0){ | ||
queue.getFailed().then(function(jobs){ | ||
expect(jobs).to.be.a('array'); | ||
done(); | ||
}); | ||
} | ||
}); | ||
queue.add({foo: 'bar'}); | ||
queue.add({baz: 'qux'}); | ||
}); | ||
}); | ||
}); |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
49863
1292
358