Comparing version 0.1.5 to 0.1.6
changelog | ||
Changelog | ||
========= | ||
# v1 release | ||
# 0.1.x release | ||
## v0.1.6 | ||
- Enhance: stalled runQueue job are moved to workQueue on startup | ||
- Enhance: update to verbose error message | ||
- Enhance: reached 100% code coverage | ||
- Enhance: mark api as public/private | ||
## v0.1.5 | ||
@@ -8,0 +16,0 @@ |
@@ -29,3 +29,3 @@ | ||
if (!name) { | ||
throw new Error('queue name required'); | ||
throw new Error('queue name is required'); | ||
} | ||
@@ -71,16 +71,161 @@ | ||
// queue status | ||
this.shutdown = false; | ||
this.running = false; | ||
} | ||
/** | ||
* Increment queue id and return it | ||
* Add a job onto the work queue, overwrite duplicate job | ||
* | ||
* @param Object data Actual data for worker to process | ||
* @param Object opts Additional job options | ||
* @return Promise | ||
* @api Public | ||
*/ | ||
Queue.prototype.nextId = function() { | ||
Queue.prototype.add = function(data, opts) { | ||
var self = this; | ||
// note that queue id always increment, even if we don't use it | ||
return this.nextId().then(function(id) { | ||
if (!data || typeof data !== 'object') { | ||
throw new Error('job data payload must be an object'); | ||
} | ||
opts = opts || {}; | ||
// job structure | ||
var job = { | ||
id: opts.id || id | ||
, data: data | ||
, retry: opts.retry || 0 | ||
, timeout: opts.timeout || 60 | ||
}; | ||
// format job for redis, invalid data will reject promise | ||
var rjob = self.toClient(job); | ||
// add job as hash, push its id onto queue | ||
// note that overwrite existing job will requeue job id | ||
return new Promise(function(resolve, reject) { | ||
self.client.multi() | ||
.hmset(self.prefix + ':' + rjob.id, rjob) | ||
.lrem(self.workQueue, 1, rjob.id) | ||
.lpush(self.workQueue, rjob.id) | ||
.exec(function(err, res) { | ||
// client error | ||
if (err) { | ||
// only return the first error | ||
// redis client SHOULD always return non-empty array of Error | ||
// but better safe than sorry | ||
if (err.length > 0) { | ||
err = err[0]; | ||
} else { | ||
err = new Error('empty error from redis client'); | ||
} | ||
self.emit('add error', err); | ||
reject(err); | ||
// command failure | ||
// we need to check each command is returning expected result | ||
// as err is null in this case, ref: http://git.io/bT5C4Q | ||
// redis client SHOULD always return OK for set command | ||
} else if (res[0] !== 'OK') { | ||
err = new Error('fail to create job data on redis'); | ||
self.emit('add error', err); | ||
reject(err); | ||
// duplicate job id should be purged | ||
} else if (isNaN(parseInt(res[1], '10')) || res[1] > 1) { | ||
err = new Error('fail to remove duplicate job id from queue'); | ||
self.emit('add error', err); | ||
reject(err); | ||
// redis client SHOULD NOT fail if 2nd command is successful | ||
// job queue must not be empty now | ||
} else if (isNaN(parseInt(res[2], '10')) || res[2] < 1) { | ||
err = new Error('fail to push job id onto queue'); | ||
self.emit('add error', err); | ||
reject(err); | ||
} else { | ||
self.emit('add ok', job); | ||
resolve(job); | ||
} | ||
}); | ||
}); | ||
}) | ||
}; | ||
/** | ||
* Remove a job from queue given the job id | ||
* | ||
* @param Number id Job id | ||
* @return Promise | ||
* @api Public | ||
*/ | ||
Queue.prototype.remove = function(id) { | ||
var self = this; | ||
// job done, remove it | ||
return new Promise(function(resolve, reject) { | ||
self.client.incr(self.prefix + ':id', function(err, res) { | ||
self.client.multi() | ||
.lrem(self.runQueue, 1, id) | ||
.del(self.prefix + ':' + id) | ||
.exec(function(err, res) { | ||
// see prototype.add on why verbose checks are needed | ||
// client error | ||
if (err) { | ||
if (err.length > 0) { | ||
err = err[0]; | ||
} else { | ||
err = new Error('empty error from redis client'); | ||
} | ||
reject(err); | ||
// command error | ||
} else if (res[0] != 1) { | ||
err = new Error('job id missing from queue'); | ||
reject(err); | ||
} else if (res[1] != 1) { | ||
err = new Error('job data missing from redis'); | ||
reject(err); | ||
} else { | ||
resolve(); | ||
} | ||
}); | ||
}); | ||
}; | ||
/** | ||
* Report the current number of jobs in work queue | ||
* | ||
* @param String name Queue name | ||
* @return Promise | ||
* @api Public | ||
*/ | ||
Queue.prototype.count = function(name) { | ||
name = name || 'work'; | ||
var self = this; | ||
return new Promise(function(resolve, reject) { | ||
if (!self[name + 'Queue']) { | ||
reject(new Error('invalid queue name')); | ||
return; | ||
} | ||
self.client.llen(self[name + 'Queue'], function(err, res) { | ||
if (err) { | ||
@@ -98,7 +243,9 @@ reject(err); | ||
/** | ||
* Retrieve the next job on work queue, put it on the run queue | ||
* Return a job without removing it from redis | ||
* | ||
* @param Number id Job id | ||
* @return Promise | ||
* @api Public | ||
*/ | ||
Queue.prototype.nextJob = function() { | ||
Queue.prototype.get = function(id) { | ||
@@ -109,20 +256,27 @@ var self = this; | ||
self.bclient.brpoplpush( | ||
self.workQueue | ||
, self.runQueue | ||
, self.config.blockTimeout | ||
, function(err, id) { | ||
// client error | ||
if (err) { | ||
reject(err); | ||
if (!id) { | ||
reject(new Error('job id required')); | ||
return; | ||
} | ||
// blocking timeout, return special code | ||
} else if (id === null) { | ||
resolve(self.status_timeout); | ||
// get job | ||
self.client.hgetall(self.prefix + ':' + id, function(err, job) { | ||
// client error | ||
if (err) { | ||
reject(err); | ||
} else { | ||
resolve(self.get(id)); | ||
} else if (job === null) { | ||
reject(new Error('job data missing from redis')); | ||
} else { | ||
// format job for client, handle invalid job | ||
try { | ||
job = self.fromClient(job); | ||
} catch(err) { | ||
reject(err); | ||
} | ||
resolve(job); | ||
} | ||
); | ||
}); | ||
@@ -134,16 +288,10 @@ }); | ||
/** | ||
* Convert job data into a format supported by redis client | ||
* Stop queue processing gracefully | ||
* | ||
* @param Object job Queue format | ||
* @return Object | ||
* @return Void | ||
* @api Public | ||
*/ | ||
Queue.prototype.toClient = function(job) { | ||
Queue.prototype.stop = function() { | ||
// all values must be primitive type | ||
return { | ||
id: job.id | ||
, data: JSON.stringify(job.data) | ||
, retry: job.retry | ||
, timeout: job.timeout | ||
}; | ||
this.shutdown = true; | ||
@@ -153,17 +301,16 @@ }; | ||
/** | ||
* Convert redis data into the original job format | ||
* Restart queue processing | ||
* | ||
* @param Object job Redis format | ||
* @return Object | ||
* @return Void | ||
* @api Public | ||
*/ | ||
Queue.prototype.fromClient = function(job) { | ||
Queue.prototype.restart = function() { | ||
// values in redis are stored as string | ||
return { | ||
id: parseInt(job.id, 10) | ||
, data: JSON.parse(job.data) | ||
, retry: parseInt(job.retry, 10) | ||
, timeout: parseInt(job.timeout, 10) | ||
}; | ||
// prevent duplicate worker | ||
if (this.running) { | ||
throw new Error('worker is already running'); | ||
} | ||
this.listen(); | ||
}; | ||
@@ -176,2 +323,3 @@ | ||
* @return Void | ||
* @api Public | ||
*/ | ||
@@ -181,7 +329,7 @@ Queue.prototype.worker = function(handler) { | ||
if (typeof handler !== 'function') { | ||
throw new Error('worker must be a function'); | ||
throw new Error('job handler must be a function'); | ||
} | ||
if (this.handler) { | ||
throw new Error('worker can only be registered once'); | ||
throw new Error('job handler can only be registered once'); | ||
} | ||
@@ -200,2 +348,3 @@ | ||
* @return Promise | ||
* @api Private | ||
*/ | ||
@@ -206,5 +355,7 @@ Queue.prototype.listen = function() { | ||
this.running = true; | ||
this.emit('queue start'); | ||
return this.run().catch(function(err) { | ||
self.running = false; | ||
self.emit('queue exit', err); | ||
@@ -219,2 +370,3 @@ }); | ||
* @return Promise | ||
* @api Private | ||
*/ | ||
@@ -225,5 +377,5 @@ Queue.prototype.run = function() { | ||
return this.nextJob() | ||
return this.recoverJob() | ||
.then(function(res) { | ||
return self.handleStatus(res); | ||
return self.readJob(res); | ||
}) | ||
@@ -234,5 +386,6 @@ .then(function(job) { | ||
.then(function() { | ||
// handle shutdown state | ||
// handle graceful shutdown | ||
if (self.shutdown) { | ||
self.shutdown = false; | ||
self.running = false; | ||
self.emit('queue stop'); | ||
@@ -248,20 +401,21 @@ // loop | ||
/** | ||
* Handle known status code from nextJob | ||
* Wait for job on work queue, loop until found | ||
* | ||
* @param Mixed res Result from nextJob | ||
* @return Mixed | ||
* @param Mixed res Can be empty, status code or job | ||
* @return Promise | ||
* @api Private | ||
*/ | ||
Queue.prototype.handleStatus = function(res) { | ||
Queue.prototype.readJob = function(res) { | ||
var self = this; | ||
// blocking timeout, try again | ||
if (res === this.status_timeout) { | ||
// first run, or blocking timeout | ||
if (!res || res === this.status_timeout) { | ||
return this.nextJob().then(function(res) { | ||
return self.handleStatus(res); | ||
return self.readJob(res); | ||
}); | ||
// pass on result | ||
// pass on job object | ||
} else { | ||
return res; | ||
return Promise.resolve(res); | ||
} | ||
@@ -272,6 +426,7 @@ | ||
/** | ||
* Handle job from nextJob | ||
* Process job with handler | ||
* | ||
* @param Object job Formatted job | ||
* @return Promise | ||
* @api Private | ||
*/ | ||
@@ -286,3 +441,3 @@ Queue.prototype.handleJob = function(job) { | ||
if (!self.handler) { | ||
reject(new Error('worker must be registered first')); | ||
reject(new Error('job handler must be registered first')); | ||
return; | ||
@@ -294,3 +449,3 @@ } | ||
setTimeout(function() { | ||
reject(new Error('job timeout')); | ||
reject(new Error('job timeout threshold reached')); | ||
}, job.timeout * 1000); | ||
@@ -338,2 +493,3 @@ } | ||
* @return Promise | ||
* @api Private | ||
*/ | ||
@@ -367,3 +523,3 @@ Queue.prototype.moveJob = function(job) { | ||
} else { | ||
err = new Error('unknown error from redis client'); | ||
err = new Error('empty error from redis client'); | ||
} | ||
@@ -374,9 +530,7 @@ | ||
// command error | ||
// job id must exist on queue | ||
} else if (res[0] === null) { | ||
err = new Error('unable to find job id on queue'); | ||
err = new Error('job id missing from queue'); | ||
reject(err); | ||
// retry field must exist already | ||
} else if (isNaN(parseInt(res[1], '10')) || res[1] > 0) { | ||
err = new Error('unable to update job retry count'); | ||
err = new Error('partial job data, retry count missing'); | ||
reject(err); | ||
@@ -393,43 +547,37 @@ } else { | ||
/** | ||
* Remove a job from queue given the job id | ||
* Recover job from runQueue back to start of workQueue | ||
* | ||
* @param Number id Job id | ||
* @return Promise | ||
* @api Private | ||
*/ | ||
Queue.prototype.remove = function(id) { | ||
Queue.prototype.recoverJob = function() { | ||
var self = this; | ||
// job done, remove it | ||
return new Promise(function(resolve, reject) { | ||
return this.count('run').then(function(count) { | ||
self.client.multi() | ||
.lrem(self.runQueue, 1, id) | ||
.del(self.prefix + ':' + id) | ||
.exec(function(err, res) { | ||
// see prototype.add on why verbose checks are needed | ||
// nothing to do if last session shutdown gracefully | ||
if (count === 0) { | ||
return; | ||
// client error | ||
if (err) { | ||
if (err.length > 0) { | ||
err = err[0]; | ||
// by design there should only be at max 1 job in runQueue | ||
} else if (count > 1) { | ||
return Promise.reject( | ||
new Error('more than 1 job in queue, purge manually') | ||
); | ||
// move the job to work queue, note that retry count does not increase | ||
} else { | ||
return new Promise(function(resolve, reject) { | ||
self.client.rpoplpush(self.runQueue, self.workQueue, function(err, res) { | ||
if (err) { | ||
reject(err); | ||
} else if (res === null) { | ||
reject(new Error('job id missing from queue')); | ||
} else { | ||
err = new Error('unknown error from redis client'); | ||
resolve(); | ||
} | ||
reject(err); | ||
// command error | ||
// job id must exist on queue | ||
} else if (res[0] != 1) { | ||
err = new Error('unable to remove job id'); | ||
reject(err); | ||
// job data must exist | ||
} else if (res[1] != 1) { | ||
err = new Error('unable to remove job data'); | ||
reject(err); | ||
} else { | ||
resolve(); | ||
} | ||
}); | ||
}); | ||
} | ||
@@ -441,39 +589,9 @@ }); | ||
/** | ||
* Stop queue processing gracefully | ||
* Increment queue id and return it | ||
* | ||
* @return Void | ||
*/ | ||
Queue.prototype.stop = function() { | ||
this.shutdown = true; | ||
}; | ||
/** | ||
* Restart queue processing | ||
* | ||
* @return Void | ||
*/ | ||
Queue.prototype.restart = function() { | ||
this.listen(); | ||
}; | ||
/** | ||
* Re-run all previously failed jobs | ||
* | ||
* @return Promise | ||
* @api Private | ||
*/ | ||
//Queue.prototype.requeue = function() {}; | ||
Queue.prototype.nextId = function() { | ||
/** | ||
* Report the current number of jobs in work queue | ||
* | ||
* @param String name Queue name | ||
* @return Promise | ||
*/ | ||
Queue.prototype.count = function(name) { | ||
name = name || 'work'; | ||
var self = this; | ||
@@ -483,8 +601,3 @@ | ||
if (!self[name + 'Queue']) { | ||
reject(new Error('invalid queue name')); | ||
return; | ||
} | ||
self.client.llen(self[name + 'Queue'], function(err, res) { | ||
self.client.incr(self.prefix + ':id', function(err, res) { | ||
if (err) { | ||
@@ -502,8 +615,8 @@ reject(err); | ||
/** | ||
* Return a job without removing it from redis | ||
* Retrieve the next job on work queue, put it on the run queue | ||
* | ||
* @param Number id Job id | ||
* @return Promise | ||
* @api Private | ||
*/ | ||
Queue.prototype.get = function(id) { | ||
Queue.prototype.nextJob = function() { | ||
@@ -514,28 +627,20 @@ var self = this; | ||
if (!id) { | ||
reject(new Error('job id missing')); | ||
return; | ||
} | ||
self.bclient.brpoplpush( | ||
self.workQueue | ||
, self.runQueue | ||
, self.config.blockTimeout | ||
, function(err, id) { | ||
// client error | ||
if (err) { | ||
reject(err); | ||
// get job | ||
self.client.hgetall(self.prefix + ':' + id, function(err, job) { | ||
// client error | ||
if (err) { | ||
reject(err); | ||
// blocking timeout, return special code | ||
} else if (id === null) { | ||
resolve(self.status_timeout); | ||
// job data missing | ||
} else if (job === null) { | ||
reject(new Error('job data missing')); | ||
} else { | ||
// format job for client, handle invalid job | ||
try { | ||
job = self.fromClient(job); | ||
} catch(err) { | ||
reject(err); | ||
} else { | ||
resolve(self.get(id)); | ||
} | ||
resolve(job); | ||
} | ||
}); | ||
); | ||
@@ -547,85 +652,37 @@ }); | ||
/** | ||
* Add a job onto the work queue, overwrite duplicate job | ||
* Convert job data into a format supported by redis client | ||
* | ||
* @param Object data Actual data for worker to process | ||
* @param Object opts Additional job options | ||
* @return Promise | ||
* @param Object job Queue format | ||
* @return Object | ||
* @api Private | ||
*/ | ||
Queue.prototype.add = function(data, opts) { | ||
Queue.prototype.toClient = function(job) { | ||
var self = this; | ||
// all values must be primitive type | ||
return { | ||
id: job.id | ||
, data: JSON.stringify(job.data) | ||
, retry: job.retry | ||
, timeout: job.timeout | ||
}; | ||
// note that queue id always increment, even if we don't use it | ||
return this.nextId().then(function(id) { | ||
}; | ||
if (!data || typeof data !== 'object') { | ||
throw new Error('job data must be an object'); | ||
} | ||
/** | ||
* Convert redis data into the original job format | ||
* | ||
* @param Object job Redis format | ||
* @return Object | ||
* @api Private | ||
*/ | ||
Queue.prototype.fromClient = function(job) { | ||
opts = opts || {}; | ||
// values in redis are stored as string | ||
return { | ||
id: parseInt(job.id, 10) | ||
, data: JSON.parse(job.data) | ||
, retry: parseInt(job.retry, 10) | ||
, timeout: parseInt(job.timeout, 10) | ||
}; | ||
// job structure | ||
var job = { | ||
id: opts.id || id | ||
, data: data | ||
, retry: opts.retry || 0 | ||
, timeout: opts.timeout || 60 | ||
}; | ||
// format job for redis, invalid data will reject promise | ||
var rjob = self.toClient(job); | ||
// add job as hash, push its id onto queue | ||
// note that overwrite existing job will requeue job id | ||
return new Promise(function(resolve, reject) { | ||
self.client.multi() | ||
.hmset(self.prefix + ':' + rjob.id, rjob) | ||
.lrem(self.workQueue, 1, rjob.id) | ||
.lpush(self.workQueue, rjob.id) | ||
.exec(function(err, res) { | ||
// client error | ||
if (err) { | ||
// only return the first error | ||
// redis client SHOULD always return non-empty array of Error | ||
// but better safe than sorry | ||
if (err.length > 0) { | ||
err = err[0]; | ||
} else { | ||
err = new Error('unknown error from redis client'); | ||
} | ||
self.emit('add error', err); | ||
reject(err); | ||
// command failure | ||
// we need to check each command is returning expected result | ||
// as err is null in this case, ref: http://git.io/bT5C4Q | ||
// redis client SHOULD always return OK for set command | ||
} else if (res[0] !== 'OK') { | ||
err = new Error('unable to set job item'); | ||
self.emit('add error', err); | ||
reject(err); | ||
// duplicate job id should be purged | ||
} else if (isNaN(parseInt(res[1], '10')) || res[1] > 1) { | ||
err = new Error('unable to purge duplicate job id'); | ||
self.emit('add error', err); | ||
reject(err); | ||
// redis client SHOULD NOT fail if 2nd command is successful | ||
// job queue must not be empty now | ||
} else if (isNaN(parseInt(res[2], '10')) || res[2] < 1) { | ||
err = new Error('unable to add job id'); | ||
self.emit('add error', err); | ||
reject(err); | ||
} else { | ||
self.emit('add ok', job); | ||
resolve(job); | ||
} | ||
}); | ||
}); | ||
}) | ||
}; | ||
@@ -637,2 +694,3 @@ | ||
* @return Void | ||
* @api Private | ||
*/ | ||
@@ -651,2 +709,3 @@ Queue.prototype.clientReady = function() { | ||
* @return Void | ||
* @api Private | ||
*/ | ||
@@ -664,4 +723,5 @@ Queue.prototype.clientConnect = function() { | ||
* | ||
* @param Object err Error from redis client | ||
* @param Object err Error from redis client | ||
* @return Void | ||
* @api Private | ||
*/ | ||
@@ -680,2 +740,3 @@ Queue.prototype.clientError = function(err) { | ||
* @return Void | ||
* @api Private | ||
*/ | ||
@@ -692,2 +753,3 @@ Queue.prototype.clientEnd = function() { | ||
* @return Void | ||
* @api Private | ||
*/ | ||
@@ -709,2 +771,3 @@ Queue.prototype.clientDrain = function() { | ||
* @return Void | ||
* @api Private | ||
*/ | ||
@@ -719,2 +782,1 @@ Queue.prototype.clientIdle = function() { | ||
}; | ||
{ | ||
"name": "decent", | ||
"version": "0.1.5", | ||
"version": "0.1.6", | ||
"description": "This is a decent Redis-based job queue for Node.", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
@@ -22,4 +22,5 @@ | ||
- **Simple API**, powered by `Promise`, works in harmony with your generator library. | ||
- **Automatic job clean up and recovery**, no need to purge jobs manually. | ||
- **Proper code coverage**, we put extra emphasis on negative tests, because that's when most queues fall apart and cause headaches. | ||
- **Annotated source code**, less than 700 loc in total. | ||
- **Annotated source code**, less than 800 loc in total. | ||
- No dependency besides `redis` driver, make use of native promise whenever possible, fallback to `bluebird` for older Node release. | ||
@@ -223,3 +224,3 @@ - Rich events to aid automation, status monitoring or building larger pipeline. | ||
- API for re-queueing failed jobs | ||
- Use-case examples | ||
- Use case examples | ||
- Web UI | ||
@@ -226,0 +227,0 @@ |
739
test/test.js
@@ -15,2 +15,3 @@ | ||
var decent = require('../index.js'); | ||
var QueueClass = require('../lib/decent.js'); | ||
var EventEmitter = require('events').EventEmitter; | ||
@@ -57,3 +58,5 @@ var Promise = require('native-or-bluebird'); | ||
describe('constructor', function() { | ||
it('should return a decent instance properly', function() { | ||
it('should return a decent instance', function() { | ||
expect(queue).to.be.an.instanceof(QueueClass); | ||
expect(queue.name).to.equal('test'); | ||
@@ -78,2 +81,4 @@ expect(queue.port).to.equal(6379); | ||
expect(queue.status_timeout).to.equal(1); | ||
expect(queue.running).to.be.false; | ||
expect(queue.shutdown).to.be.false; | ||
}); | ||
@@ -98,69 +103,202 @@ | ||
describe('nextId', function() { | ||
it('should setup queue id and return it', function() { | ||
return expect(queue.nextId()).to.eventually.equal(1); | ||
describe('add', function() { | ||
it('should reject empty data', function() { | ||
return expect(queue.add()).to.eventually.be.rejectedWith(Error); | ||
}); | ||
it('should increment queue id and return it', function() { | ||
queue.client.set(queue.prefix + ':id', 5); | ||
it('should reject non-object data', function() { | ||
return expect(queue.add('invalid')).to.eventually.be.rejectedWith(Error); | ||
}); | ||
return expect(queue.nextId()).to.eventually.equal(6); | ||
it('should add a new job to queue', function() { | ||
return queue.add({ a: 1 }).then(function(job) { | ||
queue.client.hgetall(queue.prefix + ':' + job.id, function(err, res) { | ||
expect(res.id).to.equal('1'); | ||
expect(res.data).to.equal(JSON.stringify({ a: 1 })); | ||
expect(res.retry).to.equal('0'); | ||
expect(res.timeout).to.equal('60'); | ||
}); | ||
}); | ||
}); | ||
it('should reject if queue id is not number', function() { | ||
queue.client.hmset(queue.prefix + ':id', { a: 1 }); | ||
it('should return the added job', function() { | ||
return queue.add({ a: 1 }).then(function(job) { | ||
expect(job.id).to.equal(1); | ||
expect(job.data).to.deep.equal({ a: 1 }); | ||
expect(job.retry).to.equal(0); | ||
expect(job.timeout).to.equal(60); | ||
}); | ||
}); | ||
return expect(queue.nextId()).to.eventually.be.rejectedWith(Error); | ||
it('should allow custom job options', function() { | ||
return queue.add({ a: 1 }, { retry: 1, timeout: 120 }).then(function(job) { | ||
expect(job.id).to.equal(1); | ||
expect(job.data).to.deep.equal({ a: 1 }); | ||
expect(job.retry).to.equal(1); | ||
expect(job.timeout).to.equal(120); | ||
}); | ||
}); | ||
}); | ||
describe('nextJob', function() { | ||
it('should return the next job on queue', function() { | ||
it('should overwrite existing job, and requeue job id', function() { | ||
return queue.add({ a: 1 }, { timeout: 120 }).then(function(job) { | ||
return queue.add({ b: 1 }, { id: job.id }).then(function() { | ||
queue.client.hgetall(queue.prefix + ':' + job.id, function(err, res) { | ||
expect(res.id).to.equal('1'); | ||
expect(res.data).to.equal(JSON.stringify({ b: 1 })); | ||
expect(res.retry).to.equal('0'); | ||
expect(res.timeout).to.equal('60'); | ||
}); | ||
}); | ||
}); | ||
}); | ||
it('should increment job id on each call', function() { | ||
queue.add({ a: 1 }); | ||
return expect(queue.nextJob()).to.eventually.be.fulfilled; | ||
queue.add({ b: 1 }); | ||
return queue.add({ c: 1 }).then(function(job) { | ||
expect(job.id).to.equal(3); | ||
expect(job.data).to.deep.equal({ c: 1 }); | ||
}); | ||
}); | ||
it('should return the job properly formatted', function() { | ||
queue.add({ a: 1 }) | ||
it('should reject if data has cyclic structure', function() { | ||
var testObj = {}; | ||
testObj.key = 'value'; | ||
testObj.cycle = testObj; | ||
return expect(queue.add(testObj)).to.eventually.be.rejectedWith(Error); | ||
}); | ||
return queue.nextJob().then(function(job) { | ||
expect(job.id).to.equal(1); | ||
expect(job.data).to.deep.equal({ a: 1 }); | ||
expect(job.retry).to.equal(0); | ||
expect(job.timeout).to.equal(60); | ||
it('should emit event when done', function() { | ||
var spy = sinon.spy(); | ||
queue.on('add ok', spy); | ||
return queue.add({ a: 1 }).then(function(job) { | ||
expect(spy).to.have.been.calledOnce; | ||
expect(spy).to.have.been.calledWithMatch(job); | ||
}); | ||
}); | ||
it('should wait for next job to be available', function() { | ||
setTimeout(function() { | ||
queue.add({ a: 1 }); | ||
}, 25); | ||
return expect(queue.nextJob()).to.eventually.be.fulfilled; | ||
it('should emit event when failed', function() { | ||
queue.client.set(queue.workQueue, 1); | ||
var spy = sinon.spy(); | ||
queue.on('add error', spy); | ||
return expect(queue.add({ a: 1 })).to.eventually.be.rejectedWith(Error); | ||
}); | ||
it('should block for n seconds before returning status_timeout', function() { | ||
var stub = sinon.stub(queue.bclient, 'brpoplpush', function(a1, a2, a3, cb) { | ||
setTimeout(function() { | ||
cb(null, null); | ||
}, 25); | ||
it('should reject if redis return empty error', function() { | ||
var sandbox = sinon.sandbox.create(); | ||
var s0 = sandbox.stub(queue.client, 'multi').returnsThis(); | ||
var s1 = sandbox.stub(queue.client, 'hmset').returnsThis(); | ||
var s2 = sandbox.stub(queue.client, 'lrem').returnsThis(); | ||
var s3 = sandbox.stub(queue.client, 'lpush').returnsThis(); | ||
var s4 = sandbox.stub(queue.client, 'exec', function(cb) { | ||
cb([], null); | ||
}); | ||
return expect(queue.nextJob()).to.eventually.equal(queue.status_timeout); | ||
var spy = sinon.spy(); | ||
queue.on('add error', spy); | ||
return queue.add({ a: 1 }).catch(function(err) { | ||
sandbox.restore(); | ||
expect(s0).to.have.been.calledOnce; | ||
expect(s1).to.have.been.calledOnce; | ||
expect(s2).to.have.been.calledOnce; | ||
expect(s3).to.have.been.calledOnce; | ||
expect(s4).to.have.been.calledOnce; | ||
expect(spy).to.have.been.calledOnce; | ||
expect(spy).to.have.been.calledWith(err); | ||
}); | ||
}); | ||
it('should reject on connection failure', function() { | ||
var p = queue.nextJob(); | ||
it('should reject if redis return array of errors', function() { | ||
var error = new Error('some error'); | ||
var sandbox = sinon.sandbox.create(); | ||
var s0 = sandbox.stub(queue.client, 'multi').returnsThis(); | ||
var s1 = sandbox.stub(queue.client, 'hmset').returnsThis(); | ||
var s2 = sandbox.stub(queue.client, 'lrem').returnsThis(); | ||
var s3 = sandbox.stub(queue.client, 'lpush').returnsThis(); | ||
var s4 = sandbox.stub(queue.client, 'exec', function(cb) { | ||
cb([error], null); | ||
}); | ||
setTimeout(function() { | ||
queue.bclient.stream.destroy(); | ||
}, 25); | ||
var spy = sinon.spy(); | ||
queue.on('add error', spy); | ||
return expect(p).to.eventually.be.rejectedWith(Error); | ||
return queue.add({ a: 1 }).catch(function(err) { | ||
sandbox.restore(); | ||
expect(spy).to.have.been.calledWith(error); | ||
expect(err).to.equal(error); | ||
}); | ||
}); | ||
it('should reject if hmset response has unexpected value', function() { | ||
var sandbox = sinon.sandbox.create(); | ||
var s0 = sinon.stub(queue.client, 'multi').returnsThis(); | ||
var s1 = sinon.stub(queue.client, 'hmset').returnsThis(); | ||
var s2 = sinon.stub(queue.client, 'lrem').returnsThis(); | ||
var s3 = sinon.stub(queue.client, 'lpush').returnsThis(); | ||
var s4 = sinon.stub(queue.client, 'exec', function(cb) { | ||
cb(null, ['NOT OK']); | ||
}); | ||
var spy = sinon.spy(); | ||
queue.on('add error', spy); | ||
return queue.add({ a: 1 }).catch(function(err) { | ||
sandbox.restore(); | ||
expect(spy).to.have.been.calledOnce; | ||
expect(spy).to.have.been.calledWith(err); | ||
}); | ||
}); | ||
it('should reject if lrem response has unexpected value', function() { | ||
var sandbox = sinon.sandbox.create(); | ||
var s0 = sinon.stub(queue.client, 'multi').returnsThis(); | ||
var s1 = sinon.stub(queue.client, 'hmset').returnsThis(); | ||
var s2 = sinon.stub(queue.client, 'lrem').returnsThis(); | ||
var s3 = sinon.stub(queue.client, 'lpush').returnsThis(); | ||
var s4 = sinon.stub(queue.client, 'exec', function(cb) { | ||
cb(null, ['OK', 2]); | ||
}); | ||
var spy = sinon.spy(); | ||
queue.on('add error', spy); | ||
return queue.add({ a: 1 }).catch(function(err) { | ||
sandbox.restore(); | ||
expect(spy).to.have.been.calledOnce; | ||
expect(spy).to.have.been.calledWith(err); | ||
}); | ||
}); | ||
it('should reject if lpush response has unexpected value', function() { | ||
var sandbox = sinon.sandbox.create(); | ||
var s0 = sinon.stub(queue.client, 'multi').returnsThis(); | ||
var s1 = sinon.stub(queue.client, 'hmset').returnsThis(); | ||
var s2 = sinon.stub(queue.client, 'lrem').returnsThis(); | ||
var s3 = sinon.stub(queue.client, 'lpush').returnsThis(); | ||
var s4 = sinon.stub(queue.client, 'exec', function(cb) { | ||
cb(null, ['OK', 0, 0]); | ||
}); | ||
var spy = sinon.spy(); | ||
queue.on('add error', spy); | ||
return queue.add({ a: 1 }).catch(function(err) { | ||
sandbox.restore(); | ||
expect(spy).to.have.been.calledOnce; | ||
expect(spy).to.have.been.calledWith(err); | ||
}); | ||
}); | ||
}); | ||
describe('toClient', function() { | ||
it('should convert job into redis format', function() { | ||
describe('remove', function() { | ||
it('should remove job from queue and purge job data', function() { | ||
var job = { | ||
@@ -170,21 +308,50 @@ id: 1 | ||
, retry: 0 | ||
, timeout: 120 | ||
, timeout: 60 | ||
}; | ||
expect(queue.toClient(job)).to.deep.equal({ | ||
queue.client.hmset(queue.prefix + ':' + job.id, queue.toClient(job)); | ||
queue.client.lpush(queue.runQueue, job.id); | ||
return queue.remove(job.id).then(function() { | ||
return queue.count('run').then(function(count) { | ||
expect(count).to.equal(0); | ||
return expect(queue.get(job.id)).to.eventually.be.rejectedWith(Error); | ||
}); | ||
}); | ||
}); | ||
it('should reject if redis return empty error', function() { | ||
var job = { | ||
id: 1 | ||
, data: '{"a":1}' | ||
, data: { a: 1 } | ||
, retry: 0 | ||
, timeout: 120 | ||
, timeout: 60 | ||
}; | ||
queue.client.hmset(queue.prefix + ':' + job.id, queue.toClient(job)); | ||
queue.client.lpush(queue.runQueue, job.id); | ||
var sandbox = sinon.sandbox.create(); | ||
var s0 = sandbox.stub(queue.client, 'multi').returnsThis(); | ||
var s1 = sandbox.stub(queue.client, 'lrem').returnsThis(); | ||
var s2 = sandbox.stub(queue.client, 'del').returnsThis(); | ||
var s3 = sandbox.stub(queue.client, 'exec', function(cb) { | ||
cb([], null); | ||
}); | ||
return queue.remove({ a: 1 }).catch(function(err) { | ||
sandbox.restore(); | ||
expect(s0).to.have.been.calledOnce; | ||
expect(s1).to.have.been.calledOnce; | ||
expect(s2).to.have.been.calledOnce; | ||
expect(s3).to.have.been.calledOnce; | ||
expect(err).to.be.an.instanceof(Error); | ||
}); | ||
}); | ||
it('should trigger error if job data is not serializable', function() { | ||
var testObj = {}; | ||
testObj.key = 'value'; | ||
testObj.cycle = testObj; | ||
it('should reject if redis return array of errors', function() { | ||
var job = { | ||
id: 1 | ||
, data: testObj | ||
, data: { a: 1 } | ||
, retry: 0 | ||
@@ -194,27 +361,86 @@ , timeout: 60 | ||
expect(function() { queue.toClient(job) }).to.throw(Error); | ||
queue.client.hmset(queue.prefix + ':' + job.id, queue.toClient(job)); | ||
queue.client.lpush(queue.runQueue, job.id); | ||
var p = queue.remove(job.id); | ||
queue.client.stream.destroy(); | ||
return expect(p).to.eventually.be.rejectedWith(Error); | ||
}); | ||
}); | ||
describe('fromClient', function() { | ||
it('should convert redis job into original format', function() { | ||
it('should reject if job id is missing', function() { | ||
var job = { | ||
id: '1' | ||
, data: '{"a":1}' | ||
, retry: '0' | ||
, timeout: '120' | ||
id: 1 | ||
, data: { a: 1 } | ||
, retry: 0 | ||
, timeout: 60 | ||
}; | ||
expect(queue.fromClient(job)).to.deep.equal({ | ||
return expect(queue.remove(job.id)).to.eventually.be.rejectedWith(Error); | ||
}); | ||
it('should reject if job data is missing', function() { | ||
var job = { | ||
id: 1 | ||
, data: { a: 1 } | ||
, retry: 0 | ||
, timeout: 120 | ||
, timeout: 60 | ||
}; | ||
queue.client.lpush(queue.runQueue, job.id); | ||
return expect(queue.remove(job.id)).to.eventually.be.rejectedWith(Error); | ||
}); | ||
}); | ||
describe('count', function() { | ||
it('should return work queue job count by default', function() { | ||
return queue.add({ a: 1 }).then(function() { | ||
return expect(queue.count()).to.eventually.equal(1); | ||
}); | ||
}); | ||
it('should trigger error if redis job data is invalid', function() { | ||
it('should return queue job count for specified queue', function() { | ||
queue.client.lpush(queue.runQueue, 1); | ||
return expect(queue.count('run')).to.eventually.equal(1); | ||
}); | ||
it('should reject if queue name is invalid', function() { | ||
return expect(queue.count('invalid')).to.eventually.be.rejectedWith(Error); | ||
}); | ||
it('should reject if queue data is invalid', function() { | ||
queue.client.set(queue.workQueue, 1); | ||
return expect(queue.count()).to.eventually.be.rejectedWith(Error); | ||
}); | ||
}); | ||
describe('get', function() { | ||
it('should reject if no id given', function() { | ||
return expect(queue.get()).to.eventually.be.rejectedWith(Error); | ||
}); | ||
it('should return the job', function() { | ||
return queue.add({ a: 1 }).then(function(job) { | ||
return expect(queue.get(job.id)).to.eventually.be.fulfilled; | ||
}); | ||
}); | ||
it('should return the job properly formatted', function() { | ||
return queue.add({ a: 1 }).then(function(j1) { | ||
return queue.get(j1.id).then(function(j2) { | ||
expect(j2.id).to.equal(j1.id); | ||
expect(j2.data).to.deep.equal(j1.data); | ||
expect(j2.retry).to.equal(j1.retry); | ||
expect(j2.timeout).to.equal(j1.timeout); | ||
}); | ||
}); | ||
}); | ||
it('should reject on connection failure', function() { | ||
var job = { | ||
id: 1 | ||
, data: 'a:1' | ||
, data: { a: 1 } | ||
, retry: 0 | ||
@@ -224,6 +450,50 @@ , timeout: 60 | ||
expect(function() { queue.fromClient(job) }).to.throw(Error); | ||
queue.client.hmset(queue.prefix + ':' + job.id, queue.toClient(job)); | ||
var p = queue.get(job.id); | ||
queue.client.stream.destroy(); | ||
return expect(p).to.eventually.be.rejectedWith(Error); | ||
}); | ||
it('should reject if job data is missing', function() { | ||
return expect(queue.get(1)).to.eventually.be.rejectedWith(Error); | ||
}); | ||
it('should reject if job data is invalid', function() { | ||
var job = { | ||
id: '1' | ||
, data: 'a:1' | ||
, retry: '0' | ||
, timeout: '60' | ||
}; | ||
queue.client.hmset(queue.prefix + ':' + job.id, job); | ||
return expect(queue.get(job.id)).to.eventually.be.rejectedWith(Error); | ||
}); | ||
}); | ||
describe('stop', function() { | ||
it('should set shutdown to true', function() { | ||
queue.stop(); | ||
expect(queue.shutdown).to.be.true; | ||
}); | ||
}); | ||
describe('restart', function() { | ||
it('should restart listener', function() { | ||
var stub = sinon.stub(queue, 'listen'); | ||
queue.restart(); | ||
expect(stub).to.have.been.calledOnce; | ||
}); | ||
it('should throw error if already running', function() { | ||
var stub = sinon.stub(queue, 'listen'); | ||
queue.running = true; | ||
expect(function() { queue.restart(); }).to.throw(Error); | ||
expect(stub).to.not.have.been.called; | ||
}); | ||
}); | ||
describe('worker', function() { | ||
@@ -304,6 +574,4 @@ it('should register a job handler', function() { | ||
var s0 = sinon.stub(queue, 'nextJob'); | ||
var s1 = sinon.stub(queue, 'handleStatus'); | ||
var s1 = sinon.stub(queue, 'readJob'); | ||
var s2 = sinon.stub(queue, 'handleJob'); | ||
s0.returns(Promise.resolve(true)); | ||
s1.returns(Promise.resolve(true)); | ||
@@ -314,3 +582,2 @@ s2.onCall(0).returns(Promise.resolve(true)); | ||
return queue.run().catch(function(err) { | ||
expect(s0).to.have.been.calledTwice; | ||
expect(s1).to.have.been.calledTwice; | ||
@@ -327,8 +594,7 @@ expect(s2).to.have.been.calledTwice; | ||
resolve(); | ||
}, 10); | ||
}, 25); | ||
}); | ||
}; | ||
var s0 = sinon.stub(queue, 'nextJob', p); | ||
var s1 = sinon.stub(queue, 'handleStatus', p); | ||
var s1 = sinon.stub(queue, 'readJob', p); | ||
var s2 = sinon.stub(queue, 'handleJob', p); | ||
@@ -341,8 +607,7 @@ | ||
queue.shutdown = true; | ||
}, 25); | ||
}, 80); | ||
return queue.run().then(function() { | ||
expect(s0).to.have.been.calledOnce; | ||
expect(s1).to.have.been.calledOnce; | ||
expect(s2).to.have.been.calledOnce; | ||
expect(s1).to.have.been.calledTwice; | ||
expect(s2).to.have.been.calledTwice; | ||
expect(spy).to.have.been.calledOnce; | ||
@@ -353,3 +618,3 @@ }); | ||
describe('handleStatus', function() { | ||
describe('readJob', function() { | ||
it('should get next job again if input is status_timeout', function() { | ||
@@ -366,3 +631,3 @@ var job = { | ||
return queue.handleStatus(queue.status_timeout).then(function(res) { | ||
return queue.readJob(queue.status_timeout).then(function(res) { | ||
expect(stub).to.have.been.calledOnce; | ||
@@ -373,3 +638,3 @@ expect(res).to.equal(job); | ||
it('should pass on job object unchanged', function() { | ||
it('should pass on job object as promise', function() { | ||
var job = { | ||
@@ -382,3 +647,3 @@ id: 1 | ||
return expect(queue.handleStatus(job)).to.equal(job); | ||
return expect(queue.readJob(job)).to.eventually.equal(job); | ||
}); | ||
@@ -677,3 +942,3 @@ }); | ||
it('should reject on connection failure', function() { | ||
it('should reject if redis return empty error', function() { | ||
var job = { | ||
@@ -689,2 +954,32 @@ id: 1 | ||
var sandbox = sinon.sandbox.create(); | ||
var s0 = sandbox.stub(queue.client, 'multi').returnsThis(); | ||
var s1 = sandbox.stub(queue.client, 'rpoplpush').returnsThis(); | ||
var s2 = sandbox.stub(queue.client, 'hset').returnsThis(); | ||
var s3 = sandbox.stub(queue.client, 'exec', function(cb) { | ||
cb([], null); | ||
}); | ||
return queue.moveJob(job).catch(function(err) { | ||
sandbox.restore(); | ||
expect(s0).to.have.been.calledOnce; | ||
expect(s1).to.have.been.calledOnce; | ||
expect(s2).to.have.been.calledOnce; | ||
expect(s3).to.have.been.calledOnce; | ||
expect(err).to.be.an.instanceof(Error); | ||
}); | ||
}); | ||
it('should reject if redis return array of errors', function() { | ||
var job = { | ||
id: 1 | ||
, data: { a: 1 } | ||
, retry: 0 | ||
, timeout: 60 | ||
}; | ||
queue.client.hmset(queue.prefix + ':' + job.id, queue.toClient(job)); | ||
queue.client.lpush(queue.runQueue, job.id); | ||
var p = queue.moveJob(job); | ||
@@ -722,4 +1017,8 @@ queue.client.stream.destroy(); | ||
describe('remove', function() { | ||
it('should remove job from queue and purge job data', function() { | ||
describe('recoverJob', function() { | ||
it('should do nothing if no job found in run queue', function() { | ||
return expect(queue.recoverJob()).to.eventually.be.fulfilled; | ||
}); | ||
it('should recover job from run queue', function() { | ||
var job = { | ||
@@ -735,11 +1034,8 @@ id: 1 | ||
return queue.remove(job.id).then(function() { | ||
return queue.count('run').then(function(count) { | ||
expect(count).to.equal(0); | ||
return expect(queue.get(job.id)).to.eventually.be.rejectedWith(Error); | ||
}); | ||
return queue.recoverJob().then(function() { | ||
return expect(queue.count('run')).to.eventually.equal(0); | ||
}); | ||
}); | ||
it('should reject on connection failure', function() { | ||
it('should recover job into work queue', function() { | ||
var job = { | ||
@@ -755,10 +1051,9 @@ id: 1 | ||
var p = queue.remove(job.id); | ||
queue.client.stream.destroy(); | ||
return expect(p).to.eventually.be.rejectedWith(Error); | ||
return queue.recoverJob().then(function() { | ||
return expect(queue.count('work')).to.eventually.equal(1); | ||
}); | ||
}); | ||
it('should reject if job id is missing', function() { | ||
var job = { | ||
it('should reject if run queue has more than 1 job', function() { | ||
var j1 = { | ||
id: 1 | ||
@@ -770,9 +1065,5 @@ , data: { a: 1 } | ||
return expect(queue.remove(job.id)).to.eventually.be.rejectedWith(Error); | ||
}); | ||
it('should reject if job data is missing', function() { | ||
var job = { | ||
id: 1 | ||
, data: { a: 1 } | ||
var j2 = { | ||
id: 2 | ||
, data: { b: 1 } | ||
, retry: 0 | ||
@@ -782,125 +1073,55 @@ , timeout: 60 | ||
queue.client.lpush(queue.runQueue, job.id); | ||
queue.client.hmset(queue.prefix + ':' + j1.id, queue.toClient(j1)); | ||
queue.client.lpush(queue.runQueue, j1.id); | ||
queue.client.hmset(queue.prefix + ':' + j2.id, queue.toClient(j2)); | ||
queue.client.lpush(queue.runQueue, j2.id); | ||
return expect(queue.remove(job.id)).to.eventually.be.rejectedWith(Error); | ||
return expect(queue.recoverJob()).to.eventually.be.rejectedWith(Error); | ||
}); | ||
}); | ||
describe('stop', function() { | ||
it('should set shutdown to true', function() { | ||
queue.stop(); | ||
expect(queue.shutdown).to.be.true; | ||
}); | ||
}); | ||
it('should reject on connection failure', function() { | ||
var stub = sinon.stub(queue, 'count').returns(Promise.resolve(1)); | ||
describe('restart', function() { | ||
it('should restart listener', function() { | ||
var stub = sinon.stub(queue, 'listen'); | ||
queue.restart(); | ||
expect(stub).to.have.been.calledOnce; | ||
}); | ||
}); | ||
var p = queue.recoverJob(); | ||
queue.client.stream.destroy(); | ||
describe('count', function() { | ||
it('should return work queue job count by default', function() { | ||
return queue.add({ a: 1 }).then(function() { | ||
return expect(queue.count()).to.eventually.equal(1); | ||
}); | ||
return expect(p).to.eventually.be.rejectedWith(Error); | ||
}); | ||
it('should return queue job count for specified queue', function() { | ||
queue.client.lpush(queue.runQueue, 1); | ||
it('should reject if response is unexpected', function() { | ||
var stub = sinon.stub(queue, 'count').returns(Promise.resolve(1)); | ||
return expect(queue.count('run')).to.eventually.equal(1); | ||
return expect(queue.recoverJob()).to.eventually.be.rejectedWith(Error); | ||
}); | ||
it('should reject if queue name is invalid', function() { | ||
return expect(queue.count('invalid')).to.eventually.be.rejectedWith(Error); | ||
}); | ||
it('should reject if queue data is invalid', function() { | ||
queue.client.set(queue.workQueue, 1); | ||
return expect(queue.count()).to.eventually.be.rejectedWith(Error); | ||
}); | ||
}); | ||
describe('get', function() { | ||
it('should reject if no id given', function() { | ||
return expect(queue.get()).to.eventually.be.rejectedWith(Error); | ||
describe('nextId', function() { | ||
it('should setup queue id and return it', function() { | ||
return expect(queue.nextId()).to.eventually.equal(1); | ||
}); | ||
it('should return the job', function() { | ||
return queue.add({ a: 1 }).then(function(job) { | ||
return expect(queue.get(job.id)).to.eventually.be.fulfilled; | ||
}); | ||
}); | ||
it('should increment queue id and return it', function() { | ||
queue.client.set(queue.prefix + ':id', 5); | ||
it('should return the job properly formatted', function() { | ||
return queue.add({ a: 1 }).then(function(j1) { | ||
return queue.get(j1.id).then(function(j2) { | ||
expect(j2.id).to.equal(j1.id); | ||
expect(j2.data).to.deep.equal(j1.data); | ||
expect(j2.retry).to.equal(j1.retry); | ||
expect(j2.timeout).to.equal(j1.timeout); | ||
}); | ||
}); | ||
return expect(queue.nextId()).to.eventually.equal(6); | ||
}); | ||
it('should reject on connection failure', function() { | ||
var job = { | ||
id: 1 | ||
, data: { a: 1 } | ||
, retry: 0 | ||
, timeout: 60 | ||
}; | ||
it('should reject if queue id is not number', function() { | ||
queue.client.hmset(queue.prefix + ':id', { a: 1 }); | ||
queue.client.hmset(queue.prefix + ':' + job.id, queue.toClient(job)); | ||
var p = queue.get(job.id); | ||
queue.client.stream.destroy(); | ||
return expect(p).to.eventually.be.rejectedWith(Error); | ||
return expect(queue.nextId()).to.eventually.be.rejectedWith(Error); | ||
}); | ||
it('should reject if job data is missing', function() { | ||
return expect(queue.get(1)).to.eventually.be.rejectedWith(Error); | ||
}); | ||
it('should reject if job data is invalid', function() { | ||
var job = { | ||
id: '1' | ||
, data: 'a:1' | ||
, retry: '0' | ||
, timeout: '60' | ||
}; | ||
queue.client.hmset(queue.prefix + ':' + job.id, job); | ||
return expect(queue.get(job.id)).to.eventually.be.rejectedWith(Error); | ||
}); | ||
}); | ||
describe('add', function() { | ||
it('should reject empty data', function() { | ||
return expect(queue.add()).to.eventually.be.rejectedWith(Error); | ||
describe('nextJob', function() { | ||
it('should return the next job on queue', function() { | ||
queue.add({ a: 1 }); | ||
return expect(queue.nextJob()).to.eventually.be.fulfilled; | ||
}); | ||
it('should reject non-object data', function() { | ||
return expect(queue.add('invalid')).to.eventually.be.rejectedWith(Error); | ||
}); | ||
it('should return the job properly formatted', function() { | ||
queue.add({ a: 1 }) | ||
it('should add a new job to queue', function() { | ||
return queue.add({ a: 1 }).then(function(job) { | ||
queue.client.hgetall(queue.prefix + ':' + job.id, function(err, res) { | ||
expect(res.id).to.equal('1'); | ||
expect(res.data).to.equal(JSON.stringify({ a: 1 })); | ||
expect(res.retry).to.equal('0'); | ||
expect(res.timeout).to.equal('60'); | ||
}); | ||
}); | ||
}); | ||
it('should return the added job', function() { | ||
return queue.add({ a: 1 }).then(function(job) { | ||
return queue.nextJob().then(function(job) { | ||
expect(job.id).to.equal(1); | ||
@@ -913,66 +1134,90 @@ expect(job.data).to.deep.equal({ a: 1 }); | ||
it('should allow custom job options', function() { | ||
return queue.add({ a: 1 }, { retry: 1, timeout: 120 }).then(function(job) { | ||
expect(job.id).to.equal(1); | ||
expect(job.data).to.deep.equal({ a: 1 }); | ||
expect(job.retry).to.equal(1); | ||
expect(job.timeout).to.equal(120); | ||
}); | ||
it('should wait for next job to be available', function() { | ||
setTimeout(function() { | ||
queue.add({ a: 1 }); | ||
}, 25); | ||
return expect(queue.nextJob()).to.eventually.be.fulfilled; | ||
}); | ||
it('should overwrite existing job, and requeue job id', function() { | ||
return queue.add({ a: 1 }, { timeout: 120 }).then(function(job) { | ||
return queue.add({ b: 1 }, { id: job.id }).then(function() { | ||
queue.client.hgetall(queue.prefix + ':' + job.id, function(err, res) { | ||
expect(res.id).to.equal('1'); | ||
expect(res.data).to.equal(JSON.stringify({ b: 1 })); | ||
expect(res.retry).to.equal('0'); | ||
expect(res.timeout).to.equal('60'); | ||
}); | ||
}); | ||
it('should block for n seconds before returning status_timeout', function() { | ||
var stub = sinon.stub(queue.bclient, 'brpoplpush', function(a1, a2, a3, cb) { | ||
setTimeout(function() { | ||
cb(null, null); | ||
}, 25); | ||
}); | ||
return expect(queue.nextJob()).to.eventually.equal(queue.status_timeout); | ||
}); | ||
it('should increment job id on each call', function() { | ||
queue.add({ a: 1 }); | ||
queue.add({ b: 1 }); | ||
return queue.add({ c: 1 }).then(function(job) { | ||
expect(job.id).to.equal(3); | ||
expect(job.data).to.deep.equal({ c: 1 }); | ||
it('should reject on connection failure', function() { | ||
var p = queue.nextJob(); | ||
setTimeout(function() { | ||
queue.bclient.stream.destroy(); | ||
}, 25); | ||
return expect(p).to.eventually.be.rejectedWith(Error); | ||
}); | ||
}); | ||
describe('toClient', function() { | ||
it('should convert job into redis format', function() { | ||
var job = { | ||
id: 1 | ||
, data: { a: 1 } | ||
, retry: 0 | ||
, timeout: 120 | ||
}; | ||
expect(queue.toClient(job)).to.deep.equal({ | ||
id: 1 | ||
, data: '{"a":1}' | ||
, retry: 0 | ||
, timeout: 120 | ||
}); | ||
}); | ||
it('should reject if data has cyclic structure', function() { | ||
it('should trigger error if job data is not serializable', function() { | ||
var testObj = {}; | ||
testObj.key = 'value'; | ||
testObj.cycle = testObj; | ||
return expect(queue.add(testObj)).to.eventually.be.rejectedWith(Error); | ||
}); | ||
it('should emit event when done', function() { | ||
var spy = sinon.spy(); | ||
queue.on('add ok', spy); | ||
var job = { | ||
id: 1 | ||
, data: testObj | ||
, retry: 0 | ||
, timeout: 60 | ||
}; | ||
return queue.add({ a: 1 }).then(function(job) { | ||
expect(spy).to.have.been.calledOnce; | ||
expect(spy).to.have.been.calledWithMatch(job); | ||
}); | ||
expect(function() { queue.toClient(job) }).to.throw(Error); | ||
}); | ||
}); | ||
it('should emit event when failed', function() { | ||
queue.client.set(queue.workQueue, 1); | ||
describe('fromClient', function() { | ||
it('should convert redis job into original format', function() { | ||
var job = { | ||
id: '1' | ||
, data: '{"a":1}' | ||
, retry: '0' | ||
, timeout: '120' | ||
}; | ||
var spy = sinon.spy(); | ||
queue.on('add error', spy); | ||
return expect(queue.add({ a: 1 })).to.eventually.be.rejectedWith(Error); | ||
expect(queue.fromClient(job)).to.deep.equal({ | ||
id: 1 | ||
, data: { a: 1 } | ||
, retry: 0 | ||
, timeout: 120 | ||
}); | ||
}); | ||
it('should reject on connection failure', function() { | ||
var p = queue.add({ a: 1 }); | ||
it('should trigger error if redis job data is invalid', function() { | ||
var job = { | ||
id: 1 | ||
, data: 'a:1' | ||
, retry: 0 | ||
, timeout: 60 | ||
}; | ||
// TODO: trigger exec error handling, currently the error is from redis client | ||
queue.client.stream.destroy(); | ||
return expect(p).to.eventually.be.rejectedWith(Error); | ||
expect(function() { queue.fromClient(job) }).to.throw(Error); | ||
}); | ||
@@ -979,0 +1224,0 @@ }); |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
59253
1692
231