Comparing version 0.1.6 to 0.2.0
@@ -5,3 +5,12 @@ | ||
# 0.2.x release | ||
## v0.2.0 | ||
- Change: rename private api `listen` to `start` | ||
- Feature: new event `queue work` before worker start processing each job | ||
- Feature: `add error` and `queue error` events now emits related job as second parameter | ||
- Feature: `remove` can now remove job from any queue | ||
- Enhance: prevent potential memory leak with `run` loop due to long promise chain | ||
# 0.1.x release | ||
@@ -8,0 +17,0 @@ |
@@ -128,3 +128,3 @@ | ||
self.emit('add error', err); | ||
self.emit('add error', err, job); | ||
reject(err); | ||
@@ -139,3 +139,3 @@ | ||
err = new Error('fail to create job data on redis'); | ||
self.emit('add error', err); | ||
self.emit('add error', err, job); | ||
reject(err); | ||
@@ -145,3 +145,3 @@ // duplicate job id should be purged | ||
err = new Error('fail to remove duplicate job id from queue'); | ||
self.emit('add error', err); | ||
self.emit('add error', err, job); | ||
reject(err); | ||
@@ -152,3 +152,3 @@ // redis client SHOULD NOT fail if 2nd command is successful | ||
err = new Error('fail to push job id onto queue'); | ||
self.emit('add error', err); | ||
self.emit('add error', err, job); | ||
reject(err); | ||
@@ -170,8 +170,10 @@ } else { | ||
* | ||
* @param Number id Job id | ||
* @param Number id Job id | ||
* @param String name Queue name | ||
* @return Promise | ||
* @api Public | ||
*/ | ||
Queue.prototype.remove = function(id) { | ||
Queue.prototype.remove = function(id, name) { | ||
name = name || 'work'; | ||
var self = this; | ||
@@ -182,4 +184,9 @@ | ||
if (!self[name + 'Queue']) { | ||
reject(new Error('invalid queue name')); | ||
return; | ||
} | ||
self.client.multi() | ||
.lrem(self.runQueue, 1, id) | ||
.lrem(self[name + 'Queue'], 1, id) | ||
.del(self.prefix + ':' + id) | ||
@@ -314,3 +321,3 @@ .exec(function(err, res) { | ||
this.listen(); | ||
this.start(); | ||
@@ -339,3 +346,3 @@ }; | ||
// once the handler is registered, we can start processing jobs | ||
this.listen(); | ||
this.start(); | ||
@@ -347,6 +354,6 @@ }; | ||
* | ||
* @return Promise | ||
* @return Void | ||
* @api Private | ||
*/ | ||
Queue.prototype.listen = function() { | ||
Queue.prototype.start = function() { | ||
@@ -358,6 +365,3 @@ var self = this; | ||
return this.run().catch(function(err) { | ||
self.running = false; | ||
self.emit('queue exit', err); | ||
}); | ||
this.run(); | ||
@@ -369,3 +373,3 @@ }; | ||
* | ||
* @return Promise | ||
* @return Void | ||
* @api Private | ||
@@ -377,3 +381,4 @@ */ | ||
return this.recoverJob() | ||
// loop | ||
this.recoverJob() | ||
.then(function(res) { | ||
@@ -391,6 +396,10 @@ return self.readJob(res); | ||
self.emit('queue stop'); | ||
// loop | ||
} else { | ||
return self.run(); | ||
return; | ||
} | ||
// tail recursion | ||
self.run(); | ||
}, function(err) { | ||
// exit queue | ||
self.running = false; | ||
self.emit('queue exit', err); | ||
}); | ||
@@ -438,2 +447,5 @@ | ||
// start working on job | ||
self.emit('queue work', job); | ||
if (!self.handler) { | ||
@@ -470,3 +482,3 @@ reject(new Error('job handler must be registered first')); | ||
// job done, remove it and emit event | ||
return self.remove(job.id).then(function() { | ||
return self.remove(job.id, 'run').then(function() { | ||
self.emit('queue ok', job); | ||
@@ -479,3 +491,3 @@ }); | ||
return self.moveJob(job).then(function() { | ||
self.emit('queue error', err); | ||
self.emit('queue error', err, job); | ||
}); | ||
@@ -482,0 +494,0 @@ |
{ | ||
"name": "decent", | ||
"version": "0.1.6", | ||
"version": "0.2.0", | ||
"description": "This is a decent Redis-based job queue for Node.", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
@@ -199,4 +199,5 @@ | ||
- `queue.emit('queue start')`: queue loop has started. | ||
- `queue.emit('queue work', job)`: queue worker begin to process a `job`. | ||
- `queue.emit('queue ok', job)`: queue worker has processed a `job`. | ||
- `queue.emit('queue error', err)`: queue worker has failed to processed a job and thrown `err` (caught properly, so queue does not exit) | ||
- `queue.emit('queue error', err, job)`: queue worker has failed to processed a `job` and thrown `err` (caught properly, so queue does not exit) | ||
- `queue.emit('queue exit', err)`: queue loop has terminated due to `err`. | ||
@@ -207,4 +208,4 @@ - `queue.emit('queue stop')`: queue loop has stopped gracefully. | ||
- `queue.emit('add ok', job)`: `job` has been added to queue. | ||
- `queue.emit('add error', err)`: failed to add job onto queue due to `err`. | ||
- `queue.emit('add ok', job)`: a `job` has been added to queue. | ||
- `queue.emit('add error', err, job)`: failed to add a `job` onto queue due to `err`. | ||
@@ -224,3 +225,2 @@ | ||
- API for re-queueing failed jobs | ||
- Use case examples | ||
@@ -227,0 +227,0 @@ - Web UI |
135
test/test.js
@@ -178,2 +178,9 @@ | ||
it('should emit event when failed', function() { | ||
var job = { | ||
id: 1 | ||
, data: { a: 1 } | ||
, retry: 0 | ||
, timeout: 60 | ||
}; | ||
queue.client.set(queue.workQueue, 1); | ||
@@ -184,3 +191,6 @@ | ||
return expect(queue.add({ a: 1 })).to.eventually.be.rejectedWith(Error); | ||
return queue.add({ a: 1 }).catch(function(err) { | ||
expect(spy.args[0][0]).to.be.an.instanceof(Error); | ||
expect(spy.args[0][1]).to.deep.equal(job); | ||
}); | ||
}); | ||
@@ -312,3 +322,3 @@ | ||
return queue.remove(job.id).then(function() { | ||
return queue.remove(job.id, 'run').then(function() { | ||
return queue.count('run').then(function(count) { | ||
@@ -321,2 +331,6 @@ expect(count).to.equal(0); | ||
it('should reject if queue name is invalid', function() { | ||
return expect(queue.remove(1, 'invalid')).to.eventually.be.rejectedWith(Error); | ||
}); | ||
it('should reject if redis return empty error', function() { | ||
@@ -341,3 +355,3 @@ var job = { | ||
return queue.remove({ a: 1 }).catch(function(err) { | ||
return queue.remove({ a: 1 }, 'run').catch(function(err) { | ||
sandbox.restore(); | ||
@@ -364,3 +378,3 @@ | ||
var p = queue.remove(job.id); | ||
var p = queue.remove(job.id, 'run'); | ||
queue.client.stream.destroy(); | ||
@@ -390,3 +404,3 @@ | ||
queue.client.lpush(queue.runQueue, job.id); | ||
queue.client.lpush(queue.workQueue, job.id); | ||
@@ -486,3 +500,3 @@ return expect(queue.remove(job.id)).to.eventually.be.rejectedWith(Error); | ||
it('should restart listener', function() { | ||
var stub = sinon.stub(queue, 'listen'); | ||
var stub = sinon.stub(queue, 'start'); | ||
queue.restart(); | ||
@@ -493,3 +507,3 @@ expect(stub).to.have.been.calledOnce; | ||
it('should throw error if already running', function() { | ||
var stub = sinon.stub(queue, 'listen'); | ||
var stub = sinon.stub(queue, 'start'); | ||
queue.running = true; | ||
@@ -503,3 +517,3 @@ expect(function() { queue.restart(); }).to.throw(Error); | ||
it('should register a job handler', function() { | ||
var stub = sinon.stub(queue, 'listen'); | ||
var stub = sinon.stub(queue, 'start'); | ||
var handler = function() {}; | ||
@@ -512,3 +526,3 @@ | ||
it('should throw error if handler is not function', function() { | ||
var stub = sinon.stub(queue, 'listen'); | ||
var stub = sinon.stub(queue, 'start'); | ||
var handler = 1; | ||
@@ -520,3 +534,3 @@ | ||
it('should throw error if handler exists', function() { | ||
var stub = sinon.stub(queue, 'listen'); | ||
var stub = sinon.stub(queue, 'start'); | ||
var handler = function() {}; | ||
@@ -529,3 +543,3 @@ | ||
it('should kick start queue listener', function() { | ||
var stub = sinon.stub(queue, 'listen'); | ||
var stub = sinon.stub(queue, 'start'); | ||
var handler = function() {}; | ||
@@ -538,3 +552,3 @@ | ||
describe('listen', function() { | ||
describe('start', function() { | ||
it('should kick start run process', function() { | ||
@@ -544,5 +558,4 @@ var stub = sinon.stub(queue, 'run'); | ||
return queue.listen().then(function() { | ||
expect(stub).to.have.been.calledOnce; | ||
}); | ||
queue.start(); | ||
expect(stub).to.have.been.calledOnce; | ||
}); | ||
@@ -557,20 +570,14 @@ | ||
return queue.listen().then(function() { | ||
expect(spy).to.have.been.calledOnce; | ||
}); | ||
queue.start(); | ||
expect(spy).to.have.been.calledOnce; | ||
}); | ||
it('should emit event on error exit', function() { | ||
var error = new Error('some error'); | ||
it('should set running to true', function() { | ||
expect(queue.running).to.be.false; | ||
var stub = sinon.stub(queue, 'run'); | ||
stub.returns(Promise.reject(error)); | ||
stub.returns(Promise.resolve(true)); | ||
var spy = sinon.spy(); | ||
queue.on('queue exit', spy); | ||
return queue.listen().then(function() { | ||
expect(spy).to.have.been.calledOnce; | ||
expect(spy).to.have.been.calledWith(error); | ||
}); | ||
queue.start(); | ||
expect(queue.running).to.be.true; | ||
}); | ||
@@ -580,7 +587,10 @@ }); | ||
describe('run', function() { | ||
it('should repeatedly run until error', function() { | ||
it('should repeatedly run until error', function(done) { | ||
var error = new Error('some error'); | ||
var s0 = sinon.stub(queue, 'recoverJob'); | ||
var s1 = sinon.stub(queue, 'readJob'); | ||
var s2 = sinon.stub(queue, 'handleJob'); | ||
s0.returns(Promise.resolve(true)); | ||
s1.returns(Promise.resolve(true)); | ||
@@ -590,10 +600,20 @@ s2.onCall(0).returns(Promise.resolve(true)); | ||
return queue.run().catch(function(err) { | ||
expect(s1).to.have.been.calledTwice; | ||
expect(s2).to.have.been.calledTwice; | ||
expect(err).to.equal(error); | ||
queue.on('queue exit', function(err) { | ||
try { | ||
expect(s0).to.have.been.calledTwice; | ||
expect(s1).to.have.been.calledTwice; | ||
expect(s2).to.have.been.calledTwice; | ||
expect(queue.running).to.be.false; | ||
expect(queue.shutdown).to.be.false; | ||
expect(err).to.equal(error); | ||
done(); | ||
} catch(e) { | ||
done(e); | ||
} | ||
}); | ||
queue.run(); | ||
}); | ||
it('should repeatedly run until shutdown', function() { | ||
it('should repeatedly run until shutdown', function(done) { | ||
var p = function() { | ||
@@ -603,21 +623,28 @@ return new Promise(function(resolve, reject) { | ||
resolve(); | ||
}, 25); | ||
}, 5); | ||
}); | ||
}; | ||
var s0 = sinon.stub(queue, 'recoverJob', p); | ||
var s1 = sinon.stub(queue, 'readJob', p); | ||
var s2 = sinon.stub(queue, 'handleJob', p); | ||
var spy = sinon.spy(); | ||
queue.on('queue stop', spy); | ||
setTimeout(function() { | ||
queue.shutdown = true; | ||
}, 80); | ||
}, 25); | ||
return queue.run().then(function() { | ||
expect(s1).to.have.been.calledTwice; | ||
expect(s2).to.have.been.calledTwice; | ||
expect(spy).to.have.been.calledOnce; | ||
queue.on('queue stop', function() { | ||
try { | ||
expect(s0).to.have.been.calledTwice; | ||
expect(s1).to.have.been.calledTwice; | ||
expect(s2).to.have.been.calledTwice; | ||
expect(queue.running).to.be.false; | ||
expect(queue.shutdown).to.be.false; | ||
done(); | ||
} catch(e) { | ||
done(e); | ||
} | ||
}); | ||
queue.run(); | ||
}); | ||
@@ -674,2 +701,3 @@ }); | ||
expect(spy.args[0][0]).to.be.an.instanceof(Error); | ||
expect(spy.args[0][1]).to.equal(job); | ||
}); | ||
@@ -704,2 +732,3 @@ }); | ||
expect(spy.args[0][0]).to.be.an.instanceof(Error); | ||
expect(spy.args[0][1]).to.equal(job); | ||
}); | ||
@@ -757,2 +786,3 @@ }); | ||
expect(spy.args[0][0]).to.be.an.instanceof(Error); | ||
expect(spy.args[0][1]).to.equal(job); | ||
}); | ||
@@ -785,3 +815,3 @@ }); | ||
expect(spy).to.have.been.calledOnce; | ||
expect(spy).to.have.been.calledWith(error); | ||
expect(spy).to.have.been.calledWith(error, job); | ||
}); | ||
@@ -812,3 +842,3 @@ }); | ||
it('should emit job done event', function() { | ||
it('should emit job events', function() { | ||
var job = { | ||
@@ -826,8 +856,13 @@ id: 1 | ||
var spy = sinon.spy(); | ||
queue.on('queue ok', spy); | ||
var s1 = sinon.spy(); | ||
queue.on('queue work', s1); | ||
var s2 = sinon.spy(); | ||
queue.on('queue ok', s2); | ||
return queue.handleJob(job).then(function() { | ||
expect(spy).to.have.been.calledOnce; | ||
expect(spy).to.have.been.calledWith(job); | ||
expect(s1).to.have.been.calledOnce; | ||
expect(s1).to.have.been.calledWith(job); | ||
expect(s2).to.have.been.calledOnce; | ||
expect(s2).to.have.been.calledWith(job); | ||
}); | ||
@@ -863,3 +898,3 @@ }); | ||
expect(spy).to.have.been.calledOnce; | ||
expect(spy).to.have.been.calledWith(error); | ||
expect(spy).to.have.been.calledWith(error, job); | ||
}); | ||
@@ -866,0 +901,0 @@ }); |
60782
1733