Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

decent

Package Overview
Dependencies
Maintainers
1
Versions
8
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

decent - npm Package Compare versions

Comparing version 0.1.5 to 0.1.6

12

CHANGELOG.md
changelog
Changelog
=========
# v1 release
# 0.1.x release
## v0.1.6
- Enhance: stalled runQueue job are moved to workQueue on startup
- Enhance: update to verbose error message
- Enhance: reached 100% code coverage
- Enhance: mark api as public/private
## v0.1.5

@@ -8,0 +16,0 @@

534

lib/decent.js

@@ -29,3 +29,3 @@

if (!name) {
throw new Error('queue name required');
throw new Error('queue name is required');
}

@@ -71,16 +71,161 @@

// queue status
this.shutdown = false;
this.running = false;
}
/**
* Increment queue id and return it
* Add a job onto the work queue, overwrite duplicate job
*
* @param Object data Actual data for worker to process
* @param Object opts Additional job options
* @return Promise
* @api Public
*/
Queue.prototype.nextId = function() {
Queue.prototype.add = function(data, opts) {
var self = this;
// note that queue id always increment, even if we don't use it
return this.nextId().then(function(id) {
if (!data || typeof data !== 'object') {
throw new Error('job data payload must be an object');
}
opts = opts || {};
// job structure
var job = {
id: opts.id || id
, data: data
, retry: opts.retry || 0
, timeout: opts.timeout || 60
};
// format job for redis, invalid data will reject promise
var rjob = self.toClient(job);
// add job as hash, push its id onto queue
// note that overwrite existing job will requeue job id
return new Promise(function(resolve, reject) {
self.client.multi()
.hmset(self.prefix + ':' + rjob.id, rjob)
.lrem(self.workQueue, 1, rjob.id)
.lpush(self.workQueue, rjob.id)
.exec(function(err, res) {
// client error
if (err) {
// only return the first error
// redis client SHOULD always return non-empty array of Error
// but better safe than sorry
if (err.length > 0) {
err = err[0];
} else {
err = new Error('empty error from redis client');
}
self.emit('add error', err);
reject(err);
// command failure
// we need to check each command is returning expected result
// as err is null in this case, ref: http://git.io/bT5C4Q
// redis client SHOULD always return OK for set command
} else if (res[0] !== 'OK') {
err = new Error('fail to create job data on redis');
self.emit('add error', err);
reject(err);
// duplicate job id should be purged
} else if (isNaN(parseInt(res[1], '10')) || res[1] > 1) {
err = new Error('fail to remove duplicate job id from queue');
self.emit('add error', err);
reject(err);
// redis client SHOULD NOT fail if 2nd command is successful
// job queue must not be empty now
} else if (isNaN(parseInt(res[2], '10')) || res[2] < 1) {
err = new Error('fail to push job id onto queue');
self.emit('add error', err);
reject(err);
} else {
self.emit('add ok', job);
resolve(job);
}
});
});
})
};
/**
* Remove a job from queue given the job id
*
* @param Number id Job id
* @return Promise
* @api Public
*/
Queue.prototype.remove = function(id) {
var self = this;
// job done, remove it
return new Promise(function(resolve, reject) {
self.client.incr(self.prefix + ':id', function(err, res) {
self.client.multi()
.lrem(self.runQueue, 1, id)
.del(self.prefix + ':' + id)
.exec(function(err, res) {
// see prototype.add on why verbose checks are needed
// client error
if (err) {
if (err.length > 0) {
err = err[0];
} else {
err = new Error('empty error from redis client');
}
reject(err);
// command error
} else if (res[0] != 1) {
err = new Error('job id missing from queue');
reject(err);
} else if (res[1] != 1) {
err = new Error('job data missing from redis');
reject(err);
} else {
resolve();
}
});
});
};
/**
* Report the current number of jobs in work queue
*
* @param String name Queue name
* @return Promise
* @api Public
*/
Queue.prototype.count = function(name) {
name = name || 'work';
var self = this;
return new Promise(function(resolve, reject) {
if (!self[name + 'Queue']) {
reject(new Error('invalid queue name'));
return;
}
self.client.llen(self[name + 'Queue'], function(err, res) {
if (err) {

@@ -98,7 +243,9 @@ reject(err);

/**
* Retrieve the next job on work queue, put it on the run queue
* Return a job without removing it from redis
*
* @param Number id Job id
* @return Promise
* @api Public
*/
Queue.prototype.nextJob = function() {
Queue.prototype.get = function(id) {

@@ -109,20 +256,27 @@ var self = this;

self.bclient.brpoplpush(
self.workQueue
, self.runQueue
, self.config.blockTimeout
, function(err, id) {
// client error
if (err) {
reject(err);
if (!id) {
reject(new Error('job id required'));
return;
}
// blocking timeout, return special code
} else if (id === null) {
resolve(self.status_timeout);
// get job
self.client.hgetall(self.prefix + ':' + id, function(err, job) {
// client error
if (err) {
reject(err);
} else {
resolve(self.get(id));
} else if (job === null) {
reject(new Error('job data missing from redis'));
} else {
// format job for client, handle invalid job
try {
job = self.fromClient(job);
} catch(err) {
reject(err);
}
resolve(job);
}
);
});

@@ -134,16 +288,10 @@ });

/**
* Convert job data into a format supported by redis client
* Stop queue processing gracefully
*
* @param Object job Queue format
* @return Object
* @return Void
* @api Public
*/
Queue.prototype.toClient = function(job) {
Queue.prototype.stop = function() {
// all values must be primitive type
return {
id: job.id
, data: JSON.stringify(job.data)
, retry: job.retry
, timeout: job.timeout
};
this.shutdown = true;

@@ -153,17 +301,16 @@ };

/**
* Convert redis data into the original job format
* Restart queue processing
*
* @param Object job Redis format
* @return Object
* @return Void
* @api Public
*/
Queue.prototype.fromClient = function(job) {
Queue.prototype.restart = function() {
// values in redis are stored as string
return {
id: parseInt(job.id, 10)
, data: JSON.parse(job.data)
, retry: parseInt(job.retry, 10)
, timeout: parseInt(job.timeout, 10)
};
// prevent duplicate worker
if (this.running) {
throw new Error('worker is already running');
}
this.listen();
};

@@ -176,2 +323,3 @@

* @return Void
* @api Public
*/

@@ -181,7 +329,7 @@ Queue.prototype.worker = function(handler) {

if (typeof handler !== 'function') {
throw new Error('worker must be a function');
throw new Error('job handler must be a function');
}
if (this.handler) {
throw new Error('worker can only be registered once');
throw new Error('job handler can only be registered once');
}

@@ -200,2 +348,3 @@

* @return Promise
* @api Private
*/

@@ -206,5 +355,7 @@ Queue.prototype.listen = function() {

this.running = true;
this.emit('queue start');
return this.run().catch(function(err) {
self.running = false;
self.emit('queue exit', err);

@@ -219,2 +370,3 @@ });

* @return Promise
* @api Private
*/

@@ -225,5 +377,5 @@ Queue.prototype.run = function() {

return this.nextJob()
return this.recoverJob()
.then(function(res) {
return self.handleStatus(res);
return self.readJob(res);
})

@@ -234,5 +386,6 @@ .then(function(job) {

.then(function() {
// handle shutdown state
// handle graceful shutdown
if (self.shutdown) {
self.shutdown = false;
self.running = false;
self.emit('queue stop');

@@ -248,20 +401,21 @@ // loop

/**
* Handle known status code from nextJob
* Wait for job on work queue, loop until found
*
* @param Mixed res Result from nextJob
* @return Mixed
* @param Mixed res Can be empty, status code or job
* @return Promise
* @api Private
*/
Queue.prototype.handleStatus = function(res) {
Queue.prototype.readJob = function(res) {
var self = this;
// blocking timeout, try again
if (res === this.status_timeout) {
// first run, or blocking timeout
if (!res || res === this.status_timeout) {
return this.nextJob().then(function(res) {
return self.handleStatus(res);
return self.readJob(res);
});
// pass on result
// pass on job object
} else {
return res;
return Promise.resolve(res);
}

@@ -272,6 +426,7 @@

/**
* Handle job from nextJob
* Process job with handler
*
* @param Object job Formatted job
* @return Promise
* @api Private
*/

@@ -286,3 +441,3 @@ Queue.prototype.handleJob = function(job) {

if (!self.handler) {
reject(new Error('worker must be registered first'));
reject(new Error('job handler must be registered first'));
return;

@@ -294,3 +449,3 @@ }

setTimeout(function() {
reject(new Error('job timeout'));
reject(new Error('job timeout threshold reached'));
}, job.timeout * 1000);

@@ -338,2 +493,3 @@ }

* @return Promise
* @api Private
*/

@@ -367,3 +523,3 @@ Queue.prototype.moveJob = function(job) {

} else {
err = new Error('unknown error from redis client');
err = new Error('empty error from redis client');
}

@@ -374,9 +530,7 @@

// command error
// job id must exist on queue
} else if (res[0] === null) {
err = new Error('unable to find job id on queue');
err = new Error('job id missing from queue');
reject(err);
// retry field must exist already
} else if (isNaN(parseInt(res[1], '10')) || res[1] > 0) {
err = new Error('unable to update job retry count');
err = new Error('partial job data, retry count missing');
reject(err);

@@ -393,43 +547,37 @@ } else {

/**
* Remove a job from queue given the job id
* Recover job from runQueue back to start of workQueue
*
* @param Number id Job id
* @return Promise
* @api Private
*/
Queue.prototype.remove = function(id) {
Queue.prototype.recoverJob = function() {
var self = this;
// job done, remove it
return new Promise(function(resolve, reject) {
return this.count('run').then(function(count) {
self.client.multi()
.lrem(self.runQueue, 1, id)
.del(self.prefix + ':' + id)
.exec(function(err, res) {
// see prototype.add on why verbose checks are needed
// nothing to do if last session shutdown gracefully
if (count === 0) {
return;
// client error
if (err) {
if (err.length > 0) {
err = err[0];
// by design there should only be at max 1 job in runQueue
} else if (count > 1) {
return Promise.reject(
new Error('more than 1 job in queue, purge manually')
);
// move the job to work queue, note that retry count does not increase
} else {
return new Promise(function(resolve, reject) {
self.client.rpoplpush(self.runQueue, self.workQueue, function(err, res) {
if (err) {
reject(err);
} else if (res === null) {
reject(new Error('job id missing from queue'));
} else {
err = new Error('unknown error from redis client');
resolve();
}
reject(err);
// command error
// job id must exist on queue
} else if (res[0] != 1) {
err = new Error('unable to remove job id');
reject(err);
// job data must exist
} else if (res[1] != 1) {
err = new Error('unable to remove job data');
reject(err);
} else {
resolve();
}
});
});
}

@@ -441,39 +589,9 @@ });

/**
* Stop queue processing gracefully
* Increment queue id and return it
*
* @return Void
*/
Queue.prototype.stop = function() {
this.shutdown = true;
};
/**
* Restart queue processing
*
* @return Void
*/
Queue.prototype.restart = function() {
this.listen();
};
/**
* Re-run all previously failed jobs
*
* @return Promise
* @api Private
*/
//Queue.prototype.requeue = function() {};
Queue.prototype.nextId = function() {
/**
* Report the current number of jobs in work queue
*
* @param String name Queue name
* @return Promise
*/
Queue.prototype.count = function(name) {
name = name || 'work';
var self = this;

@@ -483,8 +601,3 @@

if (!self[name + 'Queue']) {
reject(new Error('invalid queue name'));
return;
}
self.client.llen(self[name + 'Queue'], function(err, res) {
self.client.incr(self.prefix + ':id', function(err, res) {
if (err) {

@@ -502,8 +615,8 @@ reject(err);

/**
* Return a job without removing it from redis
* Retrieve the next job on work queue, put it on the run queue
*
* @param Number id Job id
* @return Promise
* @api Private
*/
Queue.prototype.get = function(id) {
Queue.prototype.nextJob = function() {

@@ -514,28 +627,20 @@ var self = this;

if (!id) {
reject(new Error('job id missing'));
return;
}
self.bclient.brpoplpush(
self.workQueue
, self.runQueue
, self.config.blockTimeout
, function(err, id) {
// client error
if (err) {
reject(err);
// get job
self.client.hgetall(self.prefix + ':' + id, function(err, job) {
// client error
if (err) {
reject(err);
// blocking timeout, return special code
} else if (id === null) {
resolve(self.status_timeout);
// job data missing
} else if (job === null) {
reject(new Error('job data missing'));
} else {
// format job for client, handle invalid job
try {
job = self.fromClient(job);
} catch(err) {
reject(err);
} else {
resolve(self.get(id));
}
resolve(job);
}
});
);

@@ -547,85 +652,37 @@ });

/**
* Add a job onto the work queue, overwrite duplicate job
* Convert job data into a format supported by redis client
*
* @param Object data Actual data for worker to process
* @param Object opts Additional job options
* @return Promise
* @param Object job Queue format
* @return Object
* @api Private
*/
Queue.prototype.add = function(data, opts) {
Queue.prototype.toClient = function(job) {
var self = this;
// all values must be primitive type
return {
id: job.id
, data: JSON.stringify(job.data)
, retry: job.retry
, timeout: job.timeout
};
// note that queue id always increment, even if we don't use it
return this.nextId().then(function(id) {
};
if (!data || typeof data !== 'object') {
throw new Error('job data must be an object');
}
/**
* Convert redis data into the original job format
*
* @param Object job Redis format
* @return Object
* @api Private
*/
Queue.prototype.fromClient = function(job) {
opts = opts || {};
// values in redis are stored as string
return {
id: parseInt(job.id, 10)
, data: JSON.parse(job.data)
, retry: parseInt(job.retry, 10)
, timeout: parseInt(job.timeout, 10)
};
// job structure
var job = {
id: opts.id || id
, data: data
, retry: opts.retry || 0
, timeout: opts.timeout || 60
};
// format job for redis, invalid data will reject promise
var rjob = self.toClient(job);
// add job as hash, push its id onto queue
// note that overwrite existing job will requeue job id
return new Promise(function(resolve, reject) {
self.client.multi()
.hmset(self.prefix + ':' + rjob.id, rjob)
.lrem(self.workQueue, 1, rjob.id)
.lpush(self.workQueue, rjob.id)
.exec(function(err, res) {
// client error
if (err) {
// only return the first error
// redis client SHOULD always return non-empty array of Error
// but better safe than sorry
if (err.length > 0) {
err = err[0];
} else {
err = new Error('unknown error from redis client');
}
self.emit('add error', err);
reject(err);
// command failure
// we need to check each command is returning expected result
// as err is null in this case, ref: http://git.io/bT5C4Q
// redis client SHOULD always return OK for set command
} else if (res[0] !== 'OK') {
err = new Error('unable to set job item');
self.emit('add error', err);
reject(err);
// duplicate job id should be purged
} else if (isNaN(parseInt(res[1], '10')) || res[1] > 1) {
err = new Error('unable to purge duplicate job id');
self.emit('add error', err);
reject(err);
// redis client SHOULD NOT fail if 2nd command is successful
// job queue must not be empty now
} else if (isNaN(parseInt(res[2], '10')) || res[2] < 1) {
err = new Error('unable to add job id');
self.emit('add error', err);
reject(err);
} else {
self.emit('add ok', job);
resolve(job);
}
});
});
})
};

@@ -637,2 +694,3 @@

* @return Void
* @api Private
*/

@@ -651,2 +709,3 @@ Queue.prototype.clientReady = function() {

* @return Void
* @api Private
*/

@@ -664,4 +723,5 @@ Queue.prototype.clientConnect = function() {

*
* @param Object err Error from redis client
* @param Object err Error from redis client
* @return Void
* @api Private
*/

@@ -680,2 +740,3 @@ Queue.prototype.clientError = function(err) {

* @return Void
* @api Private
*/

@@ -692,2 +753,3 @@ Queue.prototype.clientEnd = function() {

* @return Void
* @api Private
*/

@@ -709,2 +771,3 @@ Queue.prototype.clientDrain = function() {

* @return Void
* @api Private
*/

@@ -719,2 +782,1 @@ Queue.prototype.clientIdle = function() {

};
{
"name": "decent",
"version": "0.1.5",
"version": "0.1.6",
"description": "This is a decent Redis-based job queue for Node.",

@@ -5,0 +5,0 @@ "main": "index.js",

@@ -22,4 +22,5 @@

- **Simple API**, powered by `Promise`, works in harmony with your generator library.
- **Automatic job clean up and recovery**, no need to purge jobs manually.
- **Proper code coverage**, we put extra emphasis on negative tests, because that's when most queues fall apart and cause headaches.
- **Annotated source code**, less than 700 loc in total.
- **Annotated source code**, less than 800 loc in total.
- No dependency besides `redis` driver, make use of native promise whenever possible, fallback to `bluebird` for older Node release.

@@ -223,3 +224,3 @@ - Rich events to aid automation, status monitoring or building larger pipeline.

- API for re-queueing failed jobs
- Use-case examples
- Use case examples
- Web UI

@@ -226,0 +227,0 @@

@@ -15,2 +15,3 @@

var decent = require('../index.js');
var QueueClass = require('../lib/decent.js');
var EventEmitter = require('events').EventEmitter;

@@ -57,3 +58,5 @@ var Promise = require('native-or-bluebird');

describe('constructor', function() {
it('should return a decent instance properly', function() {
it('should return a decent instance', function() {
expect(queue).to.be.an.instanceof(QueueClass);
expect(queue.name).to.equal('test');

@@ -78,2 +81,4 @@ expect(queue.port).to.equal(6379);

expect(queue.status_timeout).to.equal(1);
expect(queue.running).to.be.false;
expect(queue.shutdown).to.be.false;
});

@@ -98,69 +103,202 @@

describe('nextId', function() {
it('should setup queue id and return it', function() {
return expect(queue.nextId()).to.eventually.equal(1);
describe('add', function() {
it('should reject empty data', function() {
return expect(queue.add()).to.eventually.be.rejectedWith(Error);
});
it('should increment queue id and return it', function() {
queue.client.set(queue.prefix + ':id', 5);
it('should reject non-object data', function() {
return expect(queue.add('invalid')).to.eventually.be.rejectedWith(Error);
});
return expect(queue.nextId()).to.eventually.equal(6);
it('should add a new job to queue', function() {
return queue.add({ a: 1 }).then(function(job) {
queue.client.hgetall(queue.prefix + ':' + job.id, function(err, res) {
expect(res.id).to.equal('1');
expect(res.data).to.equal(JSON.stringify({ a: 1 }));
expect(res.retry).to.equal('0');
expect(res.timeout).to.equal('60');
});
});
});
it('should reject if queue id is not number', function() {
queue.client.hmset(queue.prefix + ':id', { a: 1 });
it('should return the added job', function() {
return queue.add({ a: 1 }).then(function(job) {
expect(job.id).to.equal(1);
expect(job.data).to.deep.equal({ a: 1 });
expect(job.retry).to.equal(0);
expect(job.timeout).to.equal(60);
});
});
return expect(queue.nextId()).to.eventually.be.rejectedWith(Error);
it('should allow custom job options', function() {
return queue.add({ a: 1 }, { retry: 1, timeout: 120 }).then(function(job) {
expect(job.id).to.equal(1);
expect(job.data).to.deep.equal({ a: 1 });
expect(job.retry).to.equal(1);
expect(job.timeout).to.equal(120);
});
});
});
describe('nextJob', function() {
it('should return the next job on queue', function() {
it('should overwrite existing job, and requeue job id', function() {
return queue.add({ a: 1 }, { timeout: 120 }).then(function(job) {
return queue.add({ b: 1 }, { id: job.id }).then(function() {
queue.client.hgetall(queue.prefix + ':' + job.id, function(err, res) {
expect(res.id).to.equal('1');
expect(res.data).to.equal(JSON.stringify({ b: 1 }));
expect(res.retry).to.equal('0');
expect(res.timeout).to.equal('60');
});
});
});
});
it('should increment job id on each call', function() {
queue.add({ a: 1 });
return expect(queue.nextJob()).to.eventually.be.fulfilled;
queue.add({ b: 1 });
return queue.add({ c: 1 }).then(function(job) {
expect(job.id).to.equal(3);
expect(job.data).to.deep.equal({ c: 1 });
});
});
it('should return the job properly formatted', function() {
queue.add({ a: 1 })
it('should reject if data has cyclic structure', function() {
var testObj = {};
testObj.key = 'value';
testObj.cycle = testObj;
return expect(queue.add(testObj)).to.eventually.be.rejectedWith(Error);
});
return queue.nextJob().then(function(job) {
expect(job.id).to.equal(1);
expect(job.data).to.deep.equal({ a: 1 });
expect(job.retry).to.equal(0);
expect(job.timeout).to.equal(60);
it('should emit event when done', function() {
var spy = sinon.spy();
queue.on('add ok', spy);
return queue.add({ a: 1 }).then(function(job) {
expect(spy).to.have.been.calledOnce;
expect(spy).to.have.been.calledWithMatch(job);
});
});
it('should wait for next job to be available', function() {
setTimeout(function() {
queue.add({ a: 1 });
}, 25);
return expect(queue.nextJob()).to.eventually.be.fulfilled;
it('should emit event when failed', function() {
queue.client.set(queue.workQueue, 1);
var spy = sinon.spy();
queue.on('add error', spy);
return expect(queue.add({ a: 1 })).to.eventually.be.rejectedWith(Error);
});
it('should block for n seconds before returning status_timeout', function() {
var stub = sinon.stub(queue.bclient, 'brpoplpush', function(a1, a2, a3, cb) {
setTimeout(function() {
cb(null, null);
}, 25);
it('should reject if redis return empty error', function() {
var sandbox = sinon.sandbox.create();
var s0 = sandbox.stub(queue.client, 'multi').returnsThis();
var s1 = sandbox.stub(queue.client, 'hmset').returnsThis();
var s2 = sandbox.stub(queue.client, 'lrem').returnsThis();
var s3 = sandbox.stub(queue.client, 'lpush').returnsThis();
var s4 = sandbox.stub(queue.client, 'exec', function(cb) {
cb([], null);
});
return expect(queue.nextJob()).to.eventually.equal(queue.status_timeout);
var spy = sinon.spy();
queue.on('add error', spy);
return queue.add({ a: 1 }).catch(function(err) {
sandbox.restore();
expect(s0).to.have.been.calledOnce;
expect(s1).to.have.been.calledOnce;
expect(s2).to.have.been.calledOnce;
expect(s3).to.have.been.calledOnce;
expect(s4).to.have.been.calledOnce;
expect(spy).to.have.been.calledOnce;
expect(spy).to.have.been.calledWith(err);
});
});
it('should reject on connection failure', function() {
var p = queue.nextJob();
it('should reject if redis return array of errors', function() {
var error = new Error('some error');
var sandbox = sinon.sandbox.create();
var s0 = sandbox.stub(queue.client, 'multi').returnsThis();
var s1 = sandbox.stub(queue.client, 'hmset').returnsThis();
var s2 = sandbox.stub(queue.client, 'lrem').returnsThis();
var s3 = sandbox.stub(queue.client, 'lpush').returnsThis();
var s4 = sandbox.stub(queue.client, 'exec', function(cb) {
cb([error], null);
});
setTimeout(function() {
queue.bclient.stream.destroy();
}, 25);
var spy = sinon.spy();
queue.on('add error', spy);
return expect(p).to.eventually.be.rejectedWith(Error);
return queue.add({ a: 1 }).catch(function(err) {
sandbox.restore();
expect(spy).to.have.been.calledWith(error);
expect(err).to.equal(error);
});
});
it('should reject if hmset response has unexpected value', function() {
var sandbox = sinon.sandbox.create();
var s0 = sinon.stub(queue.client, 'multi').returnsThis();
var s1 = sinon.stub(queue.client, 'hmset').returnsThis();
var s2 = sinon.stub(queue.client, 'lrem').returnsThis();
var s3 = sinon.stub(queue.client, 'lpush').returnsThis();
var s4 = sinon.stub(queue.client, 'exec', function(cb) {
cb(null, ['NOT OK']);
});
var spy = sinon.spy();
queue.on('add error', spy);
return queue.add({ a: 1 }).catch(function(err) {
sandbox.restore();
expect(spy).to.have.been.calledOnce;
expect(spy).to.have.been.calledWith(err);
});
});
it('should reject if lrem response has unexpected value', function() {
var sandbox = sinon.sandbox.create();
var s0 = sinon.stub(queue.client, 'multi').returnsThis();
var s1 = sinon.stub(queue.client, 'hmset').returnsThis();
var s2 = sinon.stub(queue.client, 'lrem').returnsThis();
var s3 = sinon.stub(queue.client, 'lpush').returnsThis();
var s4 = sinon.stub(queue.client, 'exec', function(cb) {
cb(null, ['OK', 2]);
});
var spy = sinon.spy();
queue.on('add error', spy);
return queue.add({ a: 1 }).catch(function(err) {
sandbox.restore();
expect(spy).to.have.been.calledOnce;
expect(spy).to.have.been.calledWith(err);
});
});
it('should reject if lpush response has unexpected value', function() {
var sandbox = sinon.sandbox.create();
var s0 = sinon.stub(queue.client, 'multi').returnsThis();
var s1 = sinon.stub(queue.client, 'hmset').returnsThis();
var s2 = sinon.stub(queue.client, 'lrem').returnsThis();
var s3 = sinon.stub(queue.client, 'lpush').returnsThis();
var s4 = sinon.stub(queue.client, 'exec', function(cb) {
cb(null, ['OK', 0, 0]);
});
var spy = sinon.spy();
queue.on('add error', spy);
return queue.add({ a: 1 }).catch(function(err) {
sandbox.restore();
expect(spy).to.have.been.calledOnce;
expect(spy).to.have.been.calledWith(err);
});
});
});
describe('toClient', function() {
it('should convert job into redis format', function() {
describe('remove', function() {
it('should remove job from queue and purge job data', function() {
var job = {

@@ -170,21 +308,50 @@ id: 1

, retry: 0
, timeout: 120
, timeout: 60
};
expect(queue.toClient(job)).to.deep.equal({
queue.client.hmset(queue.prefix + ':' + job.id, queue.toClient(job));
queue.client.lpush(queue.runQueue, job.id);
return queue.remove(job.id).then(function() {
return queue.count('run').then(function(count) {
expect(count).to.equal(0);
return expect(queue.get(job.id)).to.eventually.be.rejectedWith(Error);
});
});
});
it('should reject if redis return empty error', function() {
var job = {
id: 1
, data: '{"a":1}'
, data: { a: 1 }
, retry: 0
, timeout: 120
, timeout: 60
};
queue.client.hmset(queue.prefix + ':' + job.id, queue.toClient(job));
queue.client.lpush(queue.runQueue, job.id);
var sandbox = sinon.sandbox.create();
var s0 = sandbox.stub(queue.client, 'multi').returnsThis();
var s1 = sandbox.stub(queue.client, 'lrem').returnsThis();
var s2 = sandbox.stub(queue.client, 'del').returnsThis();
var s3 = sandbox.stub(queue.client, 'exec', function(cb) {
cb([], null);
});
return queue.remove({ a: 1 }).catch(function(err) {
sandbox.restore();
expect(s0).to.have.been.calledOnce;
expect(s1).to.have.been.calledOnce;
expect(s2).to.have.been.calledOnce;
expect(s3).to.have.been.calledOnce;
expect(err).to.be.an.instanceof(Error);
});
});
it('should trigger error if job data is not serializable', function() {
var testObj = {};
testObj.key = 'value';
testObj.cycle = testObj;
it('should reject if redis return array of errors', function() {
var job = {
id: 1
, data: testObj
, data: { a: 1 }
, retry: 0

@@ -194,27 +361,86 @@ , timeout: 60

expect(function() { queue.toClient(job) }).to.throw(Error);
queue.client.hmset(queue.prefix + ':' + job.id, queue.toClient(job));
queue.client.lpush(queue.runQueue, job.id);
var p = queue.remove(job.id);
queue.client.stream.destroy();
return expect(p).to.eventually.be.rejectedWith(Error);
});
});
describe('fromClient', function() {
it('should convert redis job into original format', function() {
it('should reject if job id is missing', function() {
var job = {
id: '1'
, data: '{"a":1}'
, retry: '0'
, timeout: '120'
id: 1
, data: { a: 1 }
, retry: 0
, timeout: 60
};
expect(queue.fromClient(job)).to.deep.equal({
return expect(queue.remove(job.id)).to.eventually.be.rejectedWith(Error);
});
it('should reject if job data is missing', function() {
var job = {
id: 1
, data: { a: 1 }
, retry: 0
, timeout: 120
, timeout: 60
};
queue.client.lpush(queue.runQueue, job.id);
return expect(queue.remove(job.id)).to.eventually.be.rejectedWith(Error);
});
});
describe('count', function() {
it('should return work queue job count by default', function() {
return queue.add({ a: 1 }).then(function() {
return expect(queue.count()).to.eventually.equal(1);
});
});
it('should trigger error if redis job data is invalid', function() {
it('should return queue job count for specified queue', function() {
queue.client.lpush(queue.runQueue, 1);
return expect(queue.count('run')).to.eventually.equal(1);
});
it('should reject if queue name is invalid', function() {
return expect(queue.count('invalid')).to.eventually.be.rejectedWith(Error);
});
it('should reject if queue data is invalid', function() {
queue.client.set(queue.workQueue, 1);
return expect(queue.count()).to.eventually.be.rejectedWith(Error);
});
});
describe('get', function() {
it('should reject if no id given', function() {
return expect(queue.get()).to.eventually.be.rejectedWith(Error);
});
it('should return the job', function() {
return queue.add({ a: 1 }).then(function(job) {
return expect(queue.get(job.id)).to.eventually.be.fulfilled;
});
});
it('should return the job properly formatted', function() {
return queue.add({ a: 1 }).then(function(j1) {
return queue.get(j1.id).then(function(j2) {
expect(j2.id).to.equal(j1.id);
expect(j2.data).to.deep.equal(j1.data);
expect(j2.retry).to.equal(j1.retry);
expect(j2.timeout).to.equal(j1.timeout);
});
});
});
it('should reject on connection failure', function() {
var job = {
id: 1
, data: 'a:1'
, data: { a: 1 }
, retry: 0

@@ -224,6 +450,50 @@ , timeout: 60

expect(function() { queue.fromClient(job) }).to.throw(Error);
queue.client.hmset(queue.prefix + ':' + job.id, queue.toClient(job));
var p = queue.get(job.id);
queue.client.stream.destroy();
return expect(p).to.eventually.be.rejectedWith(Error);
});
it('should reject if job data is missing', function() {
return expect(queue.get(1)).to.eventually.be.rejectedWith(Error);
});
it('should reject if job data is invalid', function() {
var job = {
id: '1'
, data: 'a:1'
, retry: '0'
, timeout: '60'
};
queue.client.hmset(queue.prefix + ':' + job.id, job);
return expect(queue.get(job.id)).to.eventually.be.rejectedWith(Error);
});
});
describe('stop', function() {
it('should set shutdown to true', function() {
queue.stop();
expect(queue.shutdown).to.be.true;
});
});
describe('restart', function() {
it('should restart listener', function() {
var stub = sinon.stub(queue, 'listen');
queue.restart();
expect(stub).to.have.been.calledOnce;
});
it('should throw error if already running', function() {
var stub = sinon.stub(queue, 'listen');
queue.running = true;
expect(function() { queue.restart(); }).to.throw(Error);
expect(stub).to.not.have.been.called;
});
});
describe('worker', function() {

@@ -304,6 +574,4 @@ it('should register a job handler', function() {

var s0 = sinon.stub(queue, 'nextJob');
var s1 = sinon.stub(queue, 'handleStatus');
var s1 = sinon.stub(queue, 'readJob');
var s2 = sinon.stub(queue, 'handleJob');
s0.returns(Promise.resolve(true));
s1.returns(Promise.resolve(true));

@@ -314,3 +582,2 @@ s2.onCall(0).returns(Promise.resolve(true));

return queue.run().catch(function(err) {
expect(s0).to.have.been.calledTwice;
expect(s1).to.have.been.calledTwice;

@@ -327,8 +594,7 @@ expect(s2).to.have.been.calledTwice;

resolve();
}, 10);
}, 25);
});
};
var s0 = sinon.stub(queue, 'nextJob', p);
var s1 = sinon.stub(queue, 'handleStatus', p);
var s1 = sinon.stub(queue, 'readJob', p);
var s2 = sinon.stub(queue, 'handleJob', p);

@@ -341,8 +607,7 @@

queue.shutdown = true;
}, 25);
}, 80);
return queue.run().then(function() {
expect(s0).to.have.been.calledOnce;
expect(s1).to.have.been.calledOnce;
expect(s2).to.have.been.calledOnce;
expect(s1).to.have.been.calledTwice;
expect(s2).to.have.been.calledTwice;
expect(spy).to.have.been.calledOnce;

@@ -353,3 +618,3 @@ });

describe('handleStatus', function() {
describe('readJob', function() {
it('should get next job again if input is status_timeout', function() {

@@ -366,3 +631,3 @@ var job = {

return queue.handleStatus(queue.status_timeout).then(function(res) {
return queue.readJob(queue.status_timeout).then(function(res) {
expect(stub).to.have.been.calledOnce;

@@ -373,3 +638,3 @@ expect(res).to.equal(job);

it('should pass on job object unchanged', function() {
it('should pass on job object as promise', function() {
var job = {

@@ -382,3 +647,3 @@ id: 1

return expect(queue.handleStatus(job)).to.equal(job);
return expect(queue.readJob(job)).to.eventually.equal(job);
});

@@ -677,3 +942,3 @@ });

it('should reject on connection failure', function() {
it('should reject if redis return empty error', function() {
var job = {

@@ -689,2 +954,32 @@ id: 1

var sandbox = sinon.sandbox.create();
var s0 = sandbox.stub(queue.client, 'multi').returnsThis();
var s1 = sandbox.stub(queue.client, 'rpoplpush').returnsThis();
var s2 = sandbox.stub(queue.client, 'hset').returnsThis();
var s3 = sandbox.stub(queue.client, 'exec', function(cb) {
cb([], null);
});
return queue.moveJob(job).catch(function(err) {
sandbox.restore();
expect(s0).to.have.been.calledOnce;
expect(s1).to.have.been.calledOnce;
expect(s2).to.have.been.calledOnce;
expect(s3).to.have.been.calledOnce;
expect(err).to.be.an.instanceof(Error);
});
});
it('should reject if redis return array of errors', function() {
var job = {
id: 1
, data: { a: 1 }
, retry: 0
, timeout: 60
};
queue.client.hmset(queue.prefix + ':' + job.id, queue.toClient(job));
queue.client.lpush(queue.runQueue, job.id);
var p = queue.moveJob(job);

@@ -722,4 +1017,8 @@ queue.client.stream.destroy();

describe('remove', function() {
it('should remove job from queue and purge job data', function() {
describe('recoverJob', function() {
it('should do nothing if no job found in run queue', function() {
return expect(queue.recoverJob()).to.eventually.be.fulfilled;
});
it('should recover job from run queue', function() {
var job = {

@@ -735,11 +1034,8 @@ id: 1

return queue.remove(job.id).then(function() {
return queue.count('run').then(function(count) {
expect(count).to.equal(0);
return expect(queue.get(job.id)).to.eventually.be.rejectedWith(Error);
});
return queue.recoverJob().then(function() {
return expect(queue.count('run')).to.eventually.equal(0);
});
});
it('should reject on connection failure', function() {
it('should recover job into work queue', function() {
var job = {

@@ -755,10 +1051,9 @@ id: 1

var p = queue.remove(job.id);
queue.client.stream.destroy();
return expect(p).to.eventually.be.rejectedWith(Error);
return queue.recoverJob().then(function() {
return expect(queue.count('work')).to.eventually.equal(1);
});
});
it('should reject if job id is missing', function() {
var job = {
it('should reject if run queue has more than 1 job', function() {
var j1 = {
id: 1

@@ -770,9 +1065,5 @@ , data: { a: 1 }

return expect(queue.remove(job.id)).to.eventually.be.rejectedWith(Error);
});
it('should reject if job data is missing', function() {
var job = {
id: 1
, data: { a: 1 }
var j2 = {
id: 2
, data: { b: 1 }
, retry: 0

@@ -782,125 +1073,55 @@ , timeout: 60

queue.client.lpush(queue.runQueue, job.id);
queue.client.hmset(queue.prefix + ':' + j1.id, queue.toClient(j1));
queue.client.lpush(queue.runQueue, j1.id);
queue.client.hmset(queue.prefix + ':' + j2.id, queue.toClient(j2));
queue.client.lpush(queue.runQueue, j2.id);
return expect(queue.remove(job.id)).to.eventually.be.rejectedWith(Error);
return expect(queue.recoverJob()).to.eventually.be.rejectedWith(Error);
});
});
describe('stop', function() {
it('should set shutdown to true', function() {
queue.stop();
expect(queue.shutdown).to.be.true;
});
});
it('should reject on connection failure', function() {
var stub = sinon.stub(queue, 'count').returns(Promise.resolve(1));
describe('restart', function() {
it('should restart listener', function() {
var stub = sinon.stub(queue, 'listen');
queue.restart();
expect(stub).to.have.been.calledOnce;
});
});
var p = queue.recoverJob();
queue.client.stream.destroy();
describe('count', function() {
it('should return work queue job count by default', function() {
return queue.add({ a: 1 }).then(function() {
return expect(queue.count()).to.eventually.equal(1);
});
return expect(p).to.eventually.be.rejectedWith(Error);
});
it('should return queue job count for specified queue', function() {
queue.client.lpush(queue.runQueue, 1);
it('should reject if response is unexpected', function() {
var stub = sinon.stub(queue, 'count').returns(Promise.resolve(1));
return expect(queue.count('run')).to.eventually.equal(1);
return expect(queue.recoverJob()).to.eventually.be.rejectedWith(Error);
});
it('should reject if queue name is invalid', function() {
return expect(queue.count('invalid')).to.eventually.be.rejectedWith(Error);
});
it('should reject if queue data is invalid', function() {
queue.client.set(queue.workQueue, 1);
return expect(queue.count()).to.eventually.be.rejectedWith(Error);
});
});
describe('get', function() {
it('should reject if no id given', function() {
return expect(queue.get()).to.eventually.be.rejectedWith(Error);
describe('nextId', function() {
it('should setup queue id and return it', function() {
return expect(queue.nextId()).to.eventually.equal(1);
});
it('should return the job', function() {
return queue.add({ a: 1 }).then(function(job) {
return expect(queue.get(job.id)).to.eventually.be.fulfilled;
});
});
it('should increment queue id and return it', function() {
queue.client.set(queue.prefix + ':id', 5);
it('should return the job properly formatted', function() {
return queue.add({ a: 1 }).then(function(j1) {
return queue.get(j1.id).then(function(j2) {
expect(j2.id).to.equal(j1.id);
expect(j2.data).to.deep.equal(j1.data);
expect(j2.retry).to.equal(j1.retry);
expect(j2.timeout).to.equal(j1.timeout);
});
});
return expect(queue.nextId()).to.eventually.equal(6);
});
it('should reject on connection failure', function() {
var job = {
id: 1
, data: { a: 1 }
, retry: 0
, timeout: 60
};
it('should reject if queue id is not number', function() {
queue.client.hmset(queue.prefix + ':id', { a: 1 });
queue.client.hmset(queue.prefix + ':' + job.id, queue.toClient(job));
var p = queue.get(job.id);
queue.client.stream.destroy();
return expect(p).to.eventually.be.rejectedWith(Error);
return expect(queue.nextId()).to.eventually.be.rejectedWith(Error);
});
it('should reject if job data is missing', function() {
return expect(queue.get(1)).to.eventually.be.rejectedWith(Error);
});
it('should reject if job data is invalid', function() {
var job = {
id: '1'
, data: 'a:1'
, retry: '0'
, timeout: '60'
};
queue.client.hmset(queue.prefix + ':' + job.id, job);
return expect(queue.get(job.id)).to.eventually.be.rejectedWith(Error);
});
});
describe('add', function() {
it('should reject empty data', function() {
return expect(queue.add()).to.eventually.be.rejectedWith(Error);
describe('nextJob', function() {
it('should return the next job on queue', function() {
queue.add({ a: 1 });
return expect(queue.nextJob()).to.eventually.be.fulfilled;
});
it('should reject non-object data', function() {
return expect(queue.add('invalid')).to.eventually.be.rejectedWith(Error);
});
it('should return the job properly formatted', function() {
queue.add({ a: 1 })
it('should add a new job to queue', function() {
return queue.add({ a: 1 }).then(function(job) {
queue.client.hgetall(queue.prefix + ':' + job.id, function(err, res) {
expect(res.id).to.equal('1');
expect(res.data).to.equal(JSON.stringify({ a: 1 }));
expect(res.retry).to.equal('0');
expect(res.timeout).to.equal('60');
});
});
});
it('should return the added job', function() {
return queue.add({ a: 1 }).then(function(job) {
return queue.nextJob().then(function(job) {
expect(job.id).to.equal(1);

@@ -913,66 +1134,90 @@ expect(job.data).to.deep.equal({ a: 1 });

it('should allow custom job options', function() {
return queue.add({ a: 1 }, { retry: 1, timeout: 120 }).then(function(job) {
expect(job.id).to.equal(1);
expect(job.data).to.deep.equal({ a: 1 });
expect(job.retry).to.equal(1);
expect(job.timeout).to.equal(120);
});
it('should wait for next job to be available', function() {
setTimeout(function() {
queue.add({ a: 1 });
}, 25);
return expect(queue.nextJob()).to.eventually.be.fulfilled;
});
it('should overwrite existing job, and requeue job id', function() {
return queue.add({ a: 1 }, { timeout: 120 }).then(function(job) {
return queue.add({ b: 1 }, { id: job.id }).then(function() {
queue.client.hgetall(queue.prefix + ':' + job.id, function(err, res) {
expect(res.id).to.equal('1');
expect(res.data).to.equal(JSON.stringify({ b: 1 }));
expect(res.retry).to.equal('0');
expect(res.timeout).to.equal('60');
});
});
it('should block for n seconds before returning status_timeout', function() {
var stub = sinon.stub(queue.bclient, 'brpoplpush', function(a1, a2, a3, cb) {
setTimeout(function() {
cb(null, null);
}, 25);
});
return expect(queue.nextJob()).to.eventually.equal(queue.status_timeout);
});
it('should increment job id on each call', function() {
queue.add({ a: 1 });
queue.add({ b: 1 });
return queue.add({ c: 1 }).then(function(job) {
expect(job.id).to.equal(3);
expect(job.data).to.deep.equal({ c: 1 });
it('should reject on connection failure', function() {
var p = queue.nextJob();
setTimeout(function() {
queue.bclient.stream.destroy();
}, 25);
return expect(p).to.eventually.be.rejectedWith(Error);
});
});
describe('toClient', function() {
it('should convert job into redis format', function() {
var job = {
id: 1
, data: { a: 1 }
, retry: 0
, timeout: 120
};
expect(queue.toClient(job)).to.deep.equal({
id: 1
, data: '{"a":1}'
, retry: 0
, timeout: 120
});
});
it('should reject if data has cyclic structure', function() {
it('should trigger error if job data is not serializable', function() {
var testObj = {};
testObj.key = 'value';
testObj.cycle = testObj;
return expect(queue.add(testObj)).to.eventually.be.rejectedWith(Error);
});
it('should emit event when done', function() {
var spy = sinon.spy();
queue.on('add ok', spy);
var job = {
id: 1
, data: testObj
, retry: 0
, timeout: 60
};
return queue.add({ a: 1 }).then(function(job) {
expect(spy).to.have.been.calledOnce;
expect(spy).to.have.been.calledWithMatch(job);
});
expect(function() { queue.toClient(job) }).to.throw(Error);
});
});
it('should emit event when failed', function() {
queue.client.set(queue.workQueue, 1);
describe('fromClient', function() {
it('should convert redis job into original format', function() {
var job = {
id: '1'
, data: '{"a":1}'
, retry: '0'
, timeout: '120'
};
var spy = sinon.spy();
queue.on('add error', spy);
return expect(queue.add({ a: 1 })).to.eventually.be.rejectedWith(Error);
expect(queue.fromClient(job)).to.deep.equal({
id: 1
, data: { a: 1 }
, retry: 0
, timeout: 120
});
});
it('should reject on connection failure', function() {
var p = queue.add({ a: 1 });
it('should trigger error if redis job data is invalid', function() {
var job = {
id: 1
, data: 'a:1'
, retry: 0
, timeout: 60
};
// TODO: trigger exec error handling, currently the error is from redis client
queue.client.stream.destroy();
return expect(p).to.eventually.be.rejectedWith(Error);
expect(function() { queue.fromClient(job) }).to.throw(Error);
});

@@ -979,0 +1224,0 @@ });

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc