New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

decent

Package Overview
Dependencies
Maintainers
1
Versions
8
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

decent - npm Package Compare versions

Comparing version 0.2.0 to 1.0.1

35

CHANGELOG.md

@@ -5,20 +5,30 @@

# 0.2.x release
# 1.x release
## v1.0.1 (master)
- Fix: node v0.10 promise support
## v1.0.0
- Feature: job item now includes their current queue name for better tracking
- Enhance: remove obsolete code and tests
- Break: Redis client and add/remove events are removed
# 0.x release
## v0.2.0
- Change: rename private api `listen` to `start`
- Feature: new event `queue work` before worker start processing each job
- Feature: `add error` and `queue error` events now emits related job as second parameter
- Feature: `remove` can now remove job from any queue
- Enhance: prevent potential memory leak with `run` loop due to long promise chain
- Fix: prevent potential memory leak with `run` loop due to unresolved promise chain
# 0.1.x release
## v0.1.6
- Enhance: stalled runQueue job are moved to workQueue on startup
- Enhance: update to verbose error message
- Fix: stalled runQueue job are moved to workQueue on startup
- Enhance: verbose error message
- Enhance: reached 100% code coverage
- Enhance: mark api as public/private

@@ -28,13 +38,10 @@ ## v0.1.5

- Fix: async worker api
- Fix: documentation update
- Enhance: better code coverage
## v0.1.4
- Fix: run loop to have correct blocking mechanism
- Enhance: refactor job remove into its own method
- Enhance: better code coverage with fake server response
- Fix: run loop
## v0.1.3
- Major: initial public release
- initial public release

17

index.js

@@ -8,17 +8,2 @@

var Decent = require('./lib/decent');
module.exports = factory;
/**
* Create an instance of Decent
*
* @param String name Name of this queue
* @param Object opts Redis options
* @return Object
*/
function factory(name, opts) {
return new Decent(name, opts);
};
module.exports = require('./lib/decent');

@@ -11,3 +11,2 @@

var Redis = require('redis');
var Promise = require('native-or-bluebird');
var EventEmitter = require('events').EventEmitter;

@@ -29,2 +28,6 @@ var inherits = require('util').inherits;

// allow call as function
if (!(this instanceof Queue))
return new Queue(name, opts);
if (!name) {

@@ -43,3 +46,3 @@ throw new Error('queue name is required');

this.config = {
blockTimeout: opts.blockTimeout || 30
blockTimeout: opts.blockTimeout || 60
, maxRetry: opts.maxRetry || 3

@@ -54,9 +57,2 @@ };

this.client.on('ready', this.clientReady.bind(this));
this.client.on('connect', this.clientConnect.bind(this));
this.client.on('error', this.clientError.bind(this));
this.client.on('end', this.clientEnd.bind(this));
this.client.on('drain', this.clientDrain.bind(this));
this.client.on('idle', this.clientIdle.bind(this));
// client for blocking command only

@@ -107,2 +103,3 @@ this.bclient = Redis.createClient(this.port, this.host, this.opts);

, timeout: opts.timeout || 60
, queue: self.workQueue
};

@@ -115,3 +112,3 @@

// note that overwrite existing job will requeue job id
return new Promise(function(resolve, reject) {
return new Queue.Promise(function(resolve, reject) {

@@ -125,36 +122,13 @@ self.client.multi()

if (err) {
// only return the first error
// redis client SHOULD always return non-empty array of Error
// but better safe than sorry
if (err.length > 0) {
err = err[0];
} else {
err = new Error('empty error from redis client');
}
reject(err[0]);
self.emit('add error', err, job);
reject(err);
// command failure
// we need to check each command is returning expected result
// we need to check commands are returning expected result
// as err is null in this case, ref: http://git.io/bT5C4Q
// redis client SHOULD always return OK for set command
} else if (res[0] !== 'OK') {
err = new Error('fail to create job data on redis');
self.emit('add error', err, job);
reject(err);
// duplicate job id should be purged
} else if (isNaN(parseInt(res[1], '10')) || res[1] > 1) {
err = new Error('fail to remove duplicate job id from queue');
self.emit('add error', err, job);
reject(err);
// redis client SHOULD NOT fail if 2nd command is successful
// job queue must not be empty now
} else if (isNaN(parseInt(res[2], '10')) || res[2] < 1) {
err = new Error('fail to push job id onto queue');
self.emit('add error', err, job);
reject(err);
} else {
self.emit('add ok', job);
resolve(job);

@@ -184,3 +158,3 @@ }

// job done, remove it
return new Promise(function(resolve, reject) {
return new Queue.Promise(function(resolve, reject) {

@@ -197,13 +171,7 @@ if (!self[name + 'Queue']) {

// see prototype.add on why verbose checks are needed
// client error
if (err) {
if (err.length > 0) {
err = err[0];
} else {
err = new Error('empty error from redis client');
}
// only return the first error
reject(err[0]);
reject(err);
// command error

@@ -237,3 +205,3 @@ } else if (res[0] != 1) {

return new Promise(function(resolve, reject) {
return new Queue.Promise(function(resolve, reject) {

@@ -258,3 +226,3 @@ if (!self[name + 'Queue']) {

/**
* Return a job without removing it from redis
* Return job data without removing it from redis
*

@@ -269,3 +237,3 @@ * @param Number id Job id

return new Promise(function(resolve, reject) {
return new Queue.Promise(function(resolve, reject) {

@@ -427,3 +395,3 @@ if (!id) {

} else {
return Promise.resolve(res);
return Queue.Promise.resolve(res);
}

@@ -445,3 +413,3 @@

// this promise always resolve, errors are handled
return new Promise(function(resolve, reject) {
return new Queue.Promise(function(resolve, reject) {

@@ -489,4 +457,8 @@ // start working on job

// job failure, move job to appropriate queue
return self.moveJob(job).then(function() {
self.emit('queue error', err, job);
return self.moveJob(job).then(function(job) {
if (job.queue === self.failQueue) {
self.emit('queue failure', err, job);
} else {
self.emit('queue error', err, job);
}
});

@@ -509,11 +481,13 @@

return new Promise(function(resolve, reject) {
return new Queue.Promise(function(resolve, reject) {
var multi = self.client.multi();
// check retry limit
// check retry limit, decide next queue
if (job.retry >= self.config.maxRetry) {
multi.rpoplpush(self.runQueue, self.failQueue);
job.queue = self.failQueue;
} else {
multi.rpoplpush(self.runQueue, self.workQueue);
job.queue = self.workQueue;
}

@@ -524,16 +498,11 @@

multi.hset(self.prefix + ':' + job.id, 'retry', job.retry);
multi.hset(self.prefix + ':' + job.id, 'queue', job.queue);
multi.exec(function(err, res) {
// see prototype.add on why verbose checks are needed
// client error
if (err) {
if (err.length > 0) {
err = err[0];
} else {
err = new Error('empty error from redis client');
}
// only return the first error
reject(err[0]);
reject(err);
// command error

@@ -546,4 +515,7 @@ } else if (res[0] === null) {

reject(err);
} else if (isNaN(parseInt(res[2], '10')) || res[2] > 0) {
err = new Error('partial job data, queue name missing');
reject(err);
} else {
resolve();
resolve(job);
}

@@ -574,3 +546,3 @@ });

} else if (count > 1) {
return Promise.reject(
return Queue.Promise.reject(
new Error('more than 1 job in queue, purge manually')

@@ -581,3 +553,3 @@ );

} else {
return new Promise(function(resolve, reject) {
return new Queue.Promise(function(resolve, reject) {
self.client.rpoplpush(self.runQueue, self.workQueue, function(err, res) {

@@ -609,3 +581,3 @@ if (err) {

return new Promise(function(resolve, reject) {
return new Queue.Promise(function(resolve, reject) {

@@ -634,3 +606,3 @@ self.client.incr(self.prefix + ':id', function(err, res) {

return new Promise(function(resolve, reject) {
return new Queue.Promise(function(resolve, reject) {

@@ -675,2 +647,3 @@ self.bclient.brpoplpush(

, timeout: job.timeout
, queue: job.queue
};

@@ -695,2 +668,3 @@

, timeout: parseInt(job.timeout, 10)
, queue: job.queue
};

@@ -700,87 +674,3 @@

/**
* Handle redis client ready event
*
* @return Void
* @api Private
*/
Queue.prototype.clientReady = function() {
if (!this.opts.no_ready_check) {
this.emit('client ready');
}
};
/**
* Handle redis client connect event
*
* @return Void
* @api Private
*/
Queue.prototype.clientConnect = function() {
if (this.opts.no_ready_check) {
this.emit('client ready');
}
};
/**
* Handle redis client error event
*
* @param Object err Error from redis client
* @return Void
* @api Private
*/
Queue.prototype.clientError = function(err) {
if (err instanceof Error) {
this.emit('client error', err);
}
};
/**
* Handle redis client end event
*
* @return Void
* @api Private
*/
Queue.prototype.clientEnd = function() {
this.emit('client close');
};
/**
* Handle redis client drain event
*
* @return Void
* @api Private
*/
Queue.prototype.clientDrain = function() {
var l = this.client.command_queue.length;
if (l > 0) {
this.idle = false;
this.emit('client pressure', l);
}
};
/**
* Handle redis client idle event
*
* @return Void
* @api Private
*/
Queue.prototype.clientIdle = function() {
if (!this.idle) {
this.idle = true;
this.emit('client pressure', 0);
}
};
// allow custom promise module
Queue.Promise = global.Promise;
The MIT License (MIT)
Copyright (c) 2014 David Frank
Copyright (c) 2015 David Frank

@@ -5,0 +5,0 @@ Permission is hereby granted, free of charge, to any person obtaining a copy

{
"name": "decent",
"version": "0.2.0",
"version": "1.0.1",
"description": "This is a decent Redis-based job queue for Node.",

@@ -28,7 +28,6 @@ "main": "index.js",

"dependencies": {
"bluebird": "^2.3.11",
"native-or-bluebird": "^1.1.2",
"redis": "^0.12.1"
},
"devDependencies": {
"bluebird": "^2.9.6",
"chai": "^1.10.0",

@@ -35,0 +34,0 @@ "chai-as-promised": "^4.1.1",

@@ -5,14 +5,15 @@

[![npm version](https://badge.fury.io/js/decent.svg)](http://badge.fury.io/js/decent) [![Build Status](https://travis-ci.org/bitinn/decent.svg)](https://travis-ci.org/bitinn/decent) [![Coverage Status](https://img.shields.io/coveralls/bitinn/decent.svg)](https://coveralls.io/r/bitinn/decent) [![Dependency Status](https://david-dm.org/bitinn/decent.svg)](https://david-dm.org/bitinn/decent)
[![npm version][npm-image]][npm-url]
[![build status][travis-image]][travis-url]
[![coverage status][coveralls-image]][coveralls-url]
[![dependency status][david-image]][david-url]
`decent` is a Redis-based job queue for Node.
`decent` is a decent Redis job queue for Node.js
*Job queue is hard to manage, we make it decent for you.*
# Motivation
Despite efforts from brilliant developers, a reliable job queue using node.js and redis is still somewhat of a mythical beast. And no wonder: redis isn't a queueing solution by itself and node.js isn't known for superior error handling; add concurrency into the mix and you got a leaky pipeline that's almost impossible to debug.
There are powerful job queue modules for node.js + redis out there, like [kue](https://github.com/LearnBoost/kue) and [bull](https://github.com/OptimalBits/bull), to name a few. But powerful API comes at a price, they need complex data structure and redis scripts to achieve features such as delayed job, pause/resume and full text search. And since redis doesn't have traditional transaction, ie. [no rollback when one of the command failed](http://redis.io/topics/transactions), and [doesn't trigger error in node-redis driver](https://github.com/mranney/node_redis/issues/689), things can go south without developers noticing. Plus it's difficult to figure out what really happened due to non-intuitive redis data structure.
In short, we need better groundwork before we can harness the power of queue. Hence the birth of `decent`: we want a library that provides solid building blocks for complex pipelines, so we can safely enjoy what job queue has to offer.
To us, the proper answer is to design around this problem, instead of adding more features, we want a job queue that's barebone, fully tested, easy to inspect, and doesn't hide errors from developers.

@@ -22,8 +23,6 @@

- **Simple API**, powered by `Promise`, works in harmony with your generator library.
- **Automatic job clean up and recovery**, no need to purge jobs manually.
- **Proper code coverage**, we put extra emphasis on negative tests, because that's when most queues fall apart and cause headaches.
- **Annotated source code**, less than 800 loc in total.
- No dependency besides `redis` driver, make use of native promise whenever possible, fallback to `bluebird` for older Node release.
- Rich events to aid automation, status monitoring or building larger pipeline.
- Simple API with promise, works with your co/koa/whatever generator library.
- Automatic job recovery that ease queue shutdown and restart.
- Make use of native promise, and allow your favorite alternative.
- Only dependency is the redis driver.

@@ -36,5 +35,9 @@

# Usage
TODO
# API
## decent(name, opts)

@@ -61,3 +64,3 @@

- `host`: redis server host, default to `'127.0.0.1'`
- `blockTimeout`: how long a client should wait for next job (see redis document on blocking command, such as [BLPOP](http://redis.io/commands/BLPOP)), defaults to `30` seconds, `0` to block forever.
- `blockTimeout`: how long a client should wait for next job (see redis document on blocking command, such as [BLPOP](http://redis.io/commands/BLPOP)), defaults to `60` seconds, `0` to block forever.
- `maxRetry`: how many retries a job can have before being moved to failure queue, defaults to `3`, `0` to disable retry.

@@ -80,2 +83,4 @@ - and all [redis client options](https://github.com/mranney/node_redis#rediscreateclient).

console.log(job.data); // { a: 1, b: 1 }
console.log(job.retry); // 1
console.log(job.timeout); // 120
});

@@ -95,2 +100,3 @@ ```

- `timeout`: how many seconds a worker can run before it's terminated.
- `queue`: which queue this job currently belongs to.

@@ -106,6 +112,6 @@

queue.worker(function(job, done) {
// ... do actual work
done();
setTimeout(function() {
console.log(job.data);
done();
}, 100);
});

@@ -157,11 +163,13 @@ ```

## queue.remove(id)
## queue.remove(id, name)
Returns a promise that will resolve when job is removed from redis (both job data and job queue).
Returns a promise that will resolve when job is removed from redis (both job data and job queue). Default queue is `work`.
Note: `remove` does not return the job, use `get` then `remove` instead.
### examples
```
queue.remove(1).then(function() {
// ...
queue.remove(1, 'run').then(function() {
// job has been removed from redis
});

@@ -178,2 +186,7 @@ ```

```
// ... setup queue and worker
queue.on('queue stop', function() {
console.log('queue stopped gracefully');
});
queue.stop();

@@ -190,2 +203,7 @@ ```

```
// ... setup queue and worker
queue.on('queue start', function() {
console.log('queue restarted');
});
queue.restart();

@@ -199,9 +217,2 @@ ```

## Redis client related
- `queue.emit('client ready')`: client is ready. (redis client has buffer built-in, so this event is emitted as soon as redis client is started.)
- `queue.emit('client error', err)`: client connection experiences error.
- `queue.emit('client close')`: client connection has been closed.
- `queue.emit('client pressure', number)`: pending number of commands, useful for rate limiting.
## queue worker related

@@ -212,28 +223,8 @@

- `queue.emit('queue ok', job)`: queue worker has processed a `job`.
- `queue.emit('queue error', err, job)`: queue worker has failed to processed a `job` and thrown `err` (caught properly, so queue does not exit)
- `queue.emit('queue exit', err)`: queue loop has terminated due to `err`.
- `queue.emit('queue error', err, job)`: queue worker has failed to processed a `job` and thrown `err`, will retry later.
- `queue.emit('queue failure', err, job)`: queue worker has failed to processed a `job` and thrown `err`, retry limit reached.
- `queue.emit('queue exit', err)`: queue loop has terminated due to unhandled `err`.
- `queue.emit('queue stop')`: queue loop has stopped gracefully.
## queue client related
- `queue.emit('add ok', job)`: a `job` has been added to queue.
- `queue.emit('add error', err, job)`: failed to add a `job` onto queue due to `err`.
# Development
```
npm install
npm test
```
Feel feel to raise any issues or feature requests, note that we do intend to keep this API simple, and all changes must be well-tested.
# Future plan
- Use case examples
- Web UI
# License

@@ -243,1 +234,9 @@

[npm-image]: https://img.shields.io/npm/v/decent.svg?style=flat-square
[npm-url]: https://www.npmjs.com/package/decent
[travis-image]: https://img.shields.io/travis/bitinn/decent.svg?style=flat-square
[travis-url]: https://travis-ci.org/bitinn/decent
[coveralls-image]: https://img.shields.io/coveralls/bitinn/decent.svg?style=flat-square
[coveralls-url]: https://coveralls.io/r/bitinn/decent
[david-image]: https://img.shields.io/david/bitinn/decent.svg?style=flat-square
[david-url]: https://david-dm.org/bitinn/decent

@@ -15,5 +15,6 @@

var decent = require('../index.js');
var QueueClass = require('../lib/decent.js');
// fallback to bluebird promise on node v0.10
var Promise = global.Promise || require('bluebird');
decent.Promise = Promise;
var EventEmitter = require('events').EventEmitter;
var Promise = require('native-or-bluebird');

@@ -59,3 +60,3 @@ // global vars

it('should return a decent instance', function() {
expect(queue).to.be.an.instanceof(QueueClass);
expect(queue).to.be.an.instanceof(decent);

@@ -67,3 +68,3 @@ expect(queue.name).to.equal('test');

expect(queue.config).to.be.an('object');
expect(queue.config.blockTimeout).to.equal(30);
expect(queue.config.blockTimeout).to.equal(60);
expect(queue.config.maxRetry).to.equal(3);

@@ -170,57 +171,2 @@

it('should emit event when done', function() {
var spy = sinon.spy();
queue.on('add ok', spy);
return queue.add({ a: 1 }).then(function(job) {
expect(spy).to.have.been.calledOnce;
expect(spy).to.have.been.calledWithMatch(job);
});
});
it('should emit event when failed', function() {
var job = {
id: 1
, data: { a: 1 }
, retry: 0
, timeout: 60
};
queue.client.set(queue.workQueue, 1);
var spy = sinon.spy();
queue.on('add error', spy);
return queue.add({ a: 1 }).catch(function(err) {
expect(spy.args[0][0]).to.be.an.instanceof(Error);
expect(spy.args[0][1]).to.deep.equal(job);
});
});
it('should reject if redis return empty error', function() {
var sandbox = sinon.sandbox.create();
var s0 = sandbox.stub(queue.client, 'multi').returnsThis();
var s1 = sandbox.stub(queue.client, 'hmset').returnsThis();
var s2 = sandbox.stub(queue.client, 'lrem').returnsThis();
var s3 = sandbox.stub(queue.client, 'lpush').returnsThis();
var s4 = sandbox.stub(queue.client, 'exec', function(cb) {
cb([], null);
});
var spy = sinon.spy();
queue.on('add error', spy);
return queue.add({ a: 1 }).catch(function(err) {
sandbox.restore();
expect(s0).to.have.been.calledOnce;
expect(s1).to.have.been.calledOnce;
expect(s2).to.have.been.calledOnce;
expect(s3).to.have.been.calledOnce;
expect(s4).to.have.been.calledOnce;
expect(spy).to.have.been.calledOnce;
expect(spy).to.have.been.calledWith(err);
});
});
it('should reject if redis return array of errors', function() {

@@ -237,9 +183,4 @@ var error = new Error('some error');

var spy = sinon.spy();
queue.on('add error', spy);
return queue.add({ a: 1 }).catch(function(err) {
sandbox.restore();
expect(spy).to.have.been.calledWith(error);
expect(err).to.equal(error);

@@ -249,23 +190,2 @@ });

it('should reject if hmset response has unexpected value', function() {
var sandbox = sinon.sandbox.create();
var s0 = sinon.stub(queue.client, 'multi').returnsThis();
var s1 = sinon.stub(queue.client, 'hmset').returnsThis();
var s2 = sinon.stub(queue.client, 'lrem').returnsThis();
var s3 = sinon.stub(queue.client, 'lpush').returnsThis();
var s4 = sinon.stub(queue.client, 'exec', function(cb) {
cb(null, ['NOT OK']);
});
var spy = sinon.spy();
queue.on('add error', spy);
return queue.add({ a: 1 }).catch(function(err) {
sandbox.restore();
expect(spy).to.have.been.calledOnce;
expect(spy).to.have.been.calledWith(err);
});
});
it('should reject if lrem response has unexpected value', function() {

@@ -281,10 +201,5 @@ var sandbox = sinon.sandbox.create();

var spy = sinon.spy();
queue.on('add error', spy);
return queue.add({ a: 1 }).catch(function(err) {
sandbox.restore();
expect(spy).to.have.been.calledOnce;
expect(spy).to.have.been.calledWith(err);
expect(err).to.be.an.instanceof(Error);
});

@@ -303,10 +218,5 @@ });

var spy = sinon.spy();
queue.on('add error', spy);
return queue.add({ a: 1 }).catch(function(err) {
sandbox.restore();
expect(spy).to.have.been.calledOnce;
expect(spy).to.have.been.calledWith(err);
expect(err).to.equal(error);
});

@@ -323,2 +233,3 @@ });

, timeout: 60
, queue: queue.runQueue
};

@@ -341,32 +252,2 @@

it('should reject if redis return empty error', function() {
var job = {
id: 1
, data: { a: 1 }
, retry: 0
, timeout: 60
};
queue.client.hmset(queue.prefix + ':' + job.id, queue.toClient(job));
queue.client.lpush(queue.runQueue, job.id);
var sandbox = sinon.sandbox.create();
var s0 = sandbox.stub(queue.client, 'multi').returnsThis();
var s1 = sandbox.stub(queue.client, 'lrem').returnsThis();
var s2 = sandbox.stub(queue.client, 'del').returnsThis();
var s3 = sandbox.stub(queue.client, 'exec', function(cb) {
cb([], null);
});
return queue.remove({ a: 1 }, 'run').catch(function(err) {
sandbox.restore();
expect(s0).to.have.been.calledOnce;
expect(s1).to.have.been.calledOnce;
expect(s2).to.have.been.calledOnce;
expect(s3).to.have.been.calledOnce;
expect(err).to.be.an.instanceof(Error);
});
});
it('should reject if redis return array of errors', function() {

@@ -378,2 +259,3 @@ var job = {

, timeout: 60
, queue: queue.runQueue
};

@@ -385,2 +267,5 @@

var p = queue.remove(job.id, 'run');
queue.client.on('error', function(err) {
// hide redis client error
});
queue.client.stream.destroy();

@@ -392,23 +277,9 @@

it('should reject if job id is missing', function() {
var job = {
id: 1
, data: { a: 1 }
, retry: 0
, timeout: 60
};
return expect(queue.remove(job.id)).to.eventually.be.rejectedWith(Error);
return expect(queue.remove(1)).to.eventually.be.rejectedWith(Error);
});
it('should reject if job data is missing', function() {
var job = {
id: 1
, data: { a: 1 }
, retry: 0
, timeout: 60
};
queue.client.lpush(queue.workQueue, 1);
queue.client.lpush(queue.workQueue, job.id);
return expect(queue.remove(job.id)).to.eventually.be.rejectedWith(Error);
return expect(queue.remove(1)).to.eventually.be.rejectedWith(Error);
});

@@ -474,2 +345,5 @@ });

var p = queue.get(job.id);
queue.client.on('error', function(err) {
// hide redis client error
});
queue.client.stream.destroy();

@@ -685,2 +559,3 @@

, timeout: 60
, queue: queue.runQueue
};

@@ -706,5 +581,6 @@

, retry: 0
// don't do this, only integer are supported
// don't do this, only integers are supported
// this is to fake timeout
, timeout: 0.01
, queue: queue.runQueue
};

@@ -738,2 +614,3 @@

, timeout: 0
, queue: queue.runQueue
};

@@ -766,2 +643,3 @@

, timeout: 0
, queue: queue.runQueue
};

@@ -793,2 +671,3 @@

, timeout: 60
, queue: queue.runQueue
};

@@ -822,2 +701,3 @@

, timeout: 60
, queue: queue.runQueue
};

@@ -845,2 +725,3 @@

, timeout: 60
, queue: queue.runQueue
};

@@ -873,2 +754,3 @@

, timeout: 60
, queue: queue.runQueue
};

@@ -887,4 +769,2 @@

});
var s1 = sinon.stub(queue, 'moveJob');
s1.returns(Promise.resolve(true));

@@ -895,2 +775,3 @@ var spy = sinon.spy();

return queue.handleJob(job).then(function() {
job.retry = 1;
expect(spy).to.have.been.calledOnce;

@@ -901,2 +782,33 @@ expect(spy).to.have.been.calledWith(error, job);

it('should emit failure from handler', function() {
var job = {
id: 1
, data: { a: 1 }
, retry: 3
, timeout: 60
, queue: queue.runQueue
};
queue.client.hmset(queue.prefix + ':' + job.id, queue.toClient(job));
queue.client.lpush(queue.runQueue, job.id);
queue.handler = function(job, done) { done() };
var error = new Error('some error');
var s0 = sinon.stub(queue, 'handler', function(job, done) {
setTimeout(function() {
done(error);
}, 25);
});
var spy = sinon.spy();
queue.on('queue failure', spy);
return queue.handleJob(job).then(function() {
job.retry = 1;
expect(spy).to.have.been.calledOnce;
expect(spy).to.have.been.calledWith(error, job);
});
});
it('should move job to another queue if handler throw error', function() {

@@ -908,2 +820,3 @@ var job = {

, timeout: 60
, queue: queue.runQueue
};

@@ -940,2 +853,3 @@

, timeout: 60
, queue: queue.runQueue
};

@@ -959,2 +873,3 @@

, timeout: 60
, queue: queue.runQueue
};

@@ -978,2 +893,3 @@

, timeout: 60
, queue: queue.runQueue
};

@@ -991,32 +907,2 @@

it('should reject if redis return empty error', function() {
var job = {
id: 1
, data: { a: 1 }
, retry: 0
, timeout: 60
};
queue.client.hmset(queue.prefix + ':' + job.id, queue.toClient(job));
queue.client.lpush(queue.runQueue, job.id);
var sandbox = sinon.sandbox.create();
var s0 = sandbox.stub(queue.client, 'multi').returnsThis();
var s1 = sandbox.stub(queue.client, 'rpoplpush').returnsThis();
var s2 = sandbox.stub(queue.client, 'hset').returnsThis();
var s3 = sandbox.stub(queue.client, 'exec', function(cb) {
cb([], null);
});
return queue.moveJob(job).catch(function(err) {
sandbox.restore();
expect(s0).to.have.been.calledOnce;
expect(s1).to.have.been.calledOnce;
expect(s2).to.have.been.calledOnce;
expect(s3).to.have.been.calledOnce;
expect(err).to.be.an.instanceof(Error);
});
});
it('should reject if redis return array of errors', function() {

@@ -1028,2 +914,3 @@ var job = {

, timeout: 60
, queue: queue.runQueue
};

@@ -1035,2 +922,5 @@

var p = queue.moveJob(job);
queue.client.on('error', function(err) {
// hide redis client error
});
queue.client.stream.destroy();

@@ -1042,18 +932,27 @@

it('should reject if job id is missing', function() {
return expect(queue.moveJob(1)).to.eventually.be.rejectedWith(Error);
});
it('should reject if job data is missing retry count', function() {
var job = {
id: 1
, data: { a: 1 }
, retry: 0
, data: '{"a":1}'
//, retry: 0
, timeout: 60
, queue: queue.runQueue
};
queue.client.hmset(queue.prefix + ':' + job.id, job);
queue.client.lpush(queue.runQueue, job.id);
return expect(queue.moveJob(job)).to.eventually.be.rejectedWith(Error);
});
it('should reject if job data is partial', function() {
it('should reject if job data is missing queue name', function() {
var job = {
id: 1
, data: '{"a":1}'
//, retry: 0
, retry: 0
, timeout: 60
//, queue: queue.runQueue
};

@@ -1079,2 +978,3 @@

, timeout: 60
, queue: queue.runQueue
};

@@ -1096,2 +996,3 @@

, timeout: 60
, queue: queue.runQueue
};

@@ -1113,2 +1014,3 @@

, timeout: 60
, queue: queue.runQueue
};

@@ -1121,2 +1023,3 @@

, timeout: 60
, queue: queue.runQueue
};

@@ -1136,2 +1039,5 @@

var p = queue.recoverJob();
queue.client.on('error', function(err) {
// hide redis client error
});
queue.client.stream.destroy();

@@ -1221,2 +1127,3 @@

, timeout: 120
, queue: queue.runQueue
};

@@ -1229,2 +1136,3 @@

, timeout: 120
, queue: queue.runQueue
});

@@ -1243,2 +1151,3 @@ });

, timeout: 60
, queue: queue.runQueue
};

@@ -1257,2 +1166,3 @@

, timeout: '120'
, queue: queue.runQueue
};

@@ -1265,2 +1175,3 @@

, timeout: 120
, queue: queue.runQueue
});

@@ -1275,2 +1186,3 @@ });

, timeout: 60
, queue: queue.runQueue
};

@@ -1282,80 +1194,2 @@

describe('clientReady', function() {
it('should emit client ready event', function() {
var spy = sinon.spy();
queue.on('client ready', spy);
queue.client.emit('ready');
expect(spy).to.have.been.calledOnce;
});
});
describe('clientConnect', function() {
it('should emit client ready event when no_ready_check is set', function() {
var spy = sinon.spy();
queue.on('client ready', spy);
queue.opts.no_ready_check = true;
queue.client.emit('connect');
expect(spy).to.have.been.calledOnce;
});
});
describe('clientError', function() {
it('should emit client error event with redis client error', function() {
var spy = sinon.spy();
queue.on('client error', spy);
var err = new Error('some error');
queue.client.emit('error', err);
var nonerr = 'not a error';
queue.client.emit('error', nonerr);
expect(spy).to.have.been.calledOnce;
expect(spy).to.have.been.calledWith(err);
});
});
describe('clientEnd', function() {
it('should emit client close event', function() {
var spy = sinon.spy();
queue.on('client close', spy);
queue.client.emit('end');
expect(spy).to.have.been.calledOnce;
});
});
describe('clientDrain', function() {
it('should emit client pressure event when cmd queue is non-zero', function() {
var spy = sinon.spy();
queue.on('client pressure', spy);
// on node v0.10+ spy should be called 3 times, 0-1-0 in that order
// on node v0.8 spy can be called more than 3 times
return queue.add({ a: 1 }).then(function() {
expect(queue.idle).to.be.true;
expect(spy).to.have.been.called;
expect(spy).to.have.been.calledWith(1);
});
});
});
describe('clientIdle', function() {
it('should emit client pressure event when no pending cmd', function() {
var spy = sinon.spy();
queue.on('client pressure', spy);
return queue.add({ a: 1 }).then(function() {
expect(queue.idle).to.be.true;
expect(spy).to.have.been.called;
expect(spy).to.have.been.calledWith(0);
});
});
});
describe('use case', function() {

@@ -1362,0 +1196,0 @@ it('should process jobs async', function(testEnd) {

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc