New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

decent

Package Overview
Dependencies
Maintainers
1
Versions
8
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

decent - npm Package Compare versions

Comparing version 0.1.4 to 0.1.5

6

CHANGELOG.md

@@ -7,2 +7,8 @@

## v0.1.5
- Fix: async worker api
- Fix: documentation update
- Enhance: better code coverage
## v0.1.4

@@ -9,0 +15,0 @@

97

lib/decent.js

@@ -197,2 +197,4 @@

this.emit('queue start');
return this.run().catch(function(err) {

@@ -264,21 +266,49 @@ self.emit('queue exit', err);

if (!this.handler) {
throw new Error('worker must be registered first');
}
var self = this;
try {
this.handler(job);
} catch(err) {
// move job to next queue
return this.moveJob(job).then(function() {
// emit handler related error
// this promise always resolve, errors are handled
return new Promise(function(resolve, reject) {
if (!self.handler) {
reject(new Error('worker must be registered first'));
return;
}
// support job timeout
if (job.timeout > 0) {
setTimeout(function() {
reject(new Error('job timeout'));
}, job.timeout * 1000);
}
// callback function for async worker
var done = function(input) {
if (input instanceof Error) {
reject(input);
} else {
resolve();
}
};
// catch worker error
try {
self.handler(job, done);
} catch(err) {
reject(err);
}
}).then(function() {
// job done, remove it and emit event
return self.remove(job.id).then(function() {
self.emit('queue ok', job);
});
}).catch(function(err) {
// job failure, move job to appropriate queue
return self.moveJob(job).then(function() {
self.emit('queue error', err);
});
}
// job done, remove it and emit event
return this.remove(job.id).then(function() {
self.emit('queue ok', job);
});

@@ -314,7 +344,10 @@

multi.exec(function(err, res) {
// see prototype.add on why verbose checks are needed
// client error
if (err) {
// only return the first error
if (err.length > 0) {
err = err[0];
} else {
err = new Error('unknown error from redis client');
}

@@ -324,8 +357,8 @@

// see Queue.prototype.add comments on why these check are necessary
// command error
// job id must exist on queue
} else if (res[0] === null) {
err = new Error('unable to find job id on queue');
reject(err);
// response should be 0
// retry field must exist already
} else if (isNaN(parseInt(res[1], '10')) || res[1] > 0) {

@@ -360,7 +393,10 @@ err = new Error('unable to update job retry count');

.exec(function(err, res) {
// see prototype.add on why verbose checks are needed
// client error
if (err) {
// only return the first error
if (err.length > 0) {
err = err[0];
} else {
err = new Error('unknown error from redis client');
}

@@ -370,7 +406,8 @@

// see Queue.prototype.add comments on why these check are necessary
// command error
// job id must exist on queue
} else if (res[0] != 1) {
err = new Error('unable to remove job id');
reject(err);
// job data must exist
} else if (res[1] != 1) {

@@ -402,6 +439,10 @@ err = new Error('unable to remove job data');

*
* @return Promise
* @return Void
*/
//Queue.prototype.restart = function() {};
Queue.prototype.restart = function() {
this.listen();
};
/**

@@ -530,4 +571,8 @@ * Re-run all previously failed jobs

// only return the first error
// redis client SHOULD always return non-empty array of Error
// but better safe than sorry
if (err.length > 0) {
err = err[0];
} else {
err = new Error('unknown error from redis client');
}

@@ -542,2 +587,3 @@

// redis client SHOULD always return OK for set command
} else if (res[0] !== 'OK') {

@@ -547,3 +593,3 @@ err = new Error('unable to set job item');

reject(err);
// response should be 0 or 1
// duplicate job id should be purged
} else if (isNaN(parseInt(res[1], '10')) || res[1] > 1) {

@@ -553,3 +599,4 @@ err = new Error('unable to purge duplicate job id');

reject(err);
// response should be a number and at least 1
// redis client SHOULD NOT fail if 2nd command is successful
// job queue must not be empty now
} else if (isNaN(parseInt(res[2], '10')) || res[2] < 1) {

@@ -556,0 +603,0 @@ err = new Error('unable to add job id');

{
"name": "decent",
"version": "0.1.4",
"version": "0.1.5",
"description": "This is a decent Redis-based job queue for Node.",

@@ -36,3 +36,2 @@ "main": "index.js",

"coveralls": "^2.11.2",
"fakeredis": "^0.2.2",
"istanbul": "^0.3.4",

@@ -39,0 +38,0 @@ "mocha": "^2.0.1",

@@ -5,3 +5,3 @@

[![npm version](https://badge.fury.io/js/decent.svg)](http://badge.fury.io/js/decent) [![Build Status](https://travis-ci.org/bitinn/decent.svg?branch=master)](https://travis-ci.org/bitinn/decent) [![Coverage Status](https://img.shields.io/coveralls/bitinn/decent.svg)](https://coveralls.io/r/bitinn/decent) [![Dependency Status](https://david-dm.org/bitinn/decent.svg)](https://david-dm.org/bitinn/decent)
[![npm version](https://badge.fury.io/js/decent.svg)](http://badge.fury.io/js/decent) [![Build Status](https://travis-ci.org/bitinn/decent.svg)](https://travis-ci.org/bitinn/decent) [![Coverage Status](https://img.shields.io/coveralls/bitinn/decent.svg)](https://coveralls.io/r/bitinn/decent) [![Dependency Status](https://david-dm.org/bitinn/decent.svg)](https://david-dm.org/bitinn/decent)

@@ -12,8 +12,6 @@ `decent` is a Redis-based job queue for Node.

(WIP)
# Motivation
Despite efforts from brilliant developers, a reliable job queue using node.js and redis is still somewhat of a mythical beast. And no wonder: redis isn't a queueing system by nature and node.js isn't known for superior error handling; add concurrency into the mix and you got a leaky pipeline that's almost impossible to debug.
Despite efforts from brilliant developers, a reliable job queue using node.js and redis is still somewhat of a mythical beast. And no wonder: redis isn't a queueing solution by itself and node.js isn't known for superior error handling; add concurrency into the mix and you got a leaky pipeline that's almost impossible to debug.

@@ -25,7 +23,7 @@ In short, we need better groundwork before we can harness the power of queue. Hence the birth of `decent`: we want a library that provides solid building blocks for complex pipelines, so we can safely enjoy what job queue has to offer.

- Simple API and helpers, powered by `Promise`.
- Proper code coverage is a basic requirement, on average at least 3 test cases for each API, and we put extra emphasis on negative tests, because that's where most queue fall and cause headaches.
- Annotated source code, less than 1,000 loc in total.
- **Simple API**, powered by `Promise`, works in harmony with your generator library.
- **Proper code coverage**, we put extra emphasis on negative tests, because that's when most queues fall apart and cause headaches.
- **Annotated source code**, less than 700 loc in total.
- No dependency besides `redis` driver, make use of native promise whenever possible, fallback to `bluebird` for older Node release.
- Rich event to aid automation, monitoring and building larger pipeline.
- Rich events to aid automation, status monitoring or building larger pipeline.

@@ -50,4 +48,8 @@

var q1 = decent('test');
var q2 = decent('test', { port: 6379, host: 'localhost', connect_timeout: 5000 });
var queue1 = decent('q1');
var queue2 = decent('q2', {
port: 6379
, host: 'localhost'
, connect_timeout: 5000
});
```

@@ -59,4 +61,4 @@

- `host`: redis server host, default to `'127.0.0.1'`
- `blockTimeout`: how long should client wait for next job (see redis document on blocking command, such as [BLPOP](http://redis.io/commands/BLPOP)), defaults to `30` seconds, `0` to block forever.
- `maxRetry`: how many retries a job can have before moving to failure queue, defaults to `3`, `0` to disable retry.
- `blockTimeout`: how long a client should wait for next job (see redis document on blocking command, such as [BLPOP](http://redis.io/commands/BLPOP)), defaults to `30` seconds, `0` to block forever.
- `maxRetry`: how many retries a job can have before being moved to failure queue, defaults to `3`, `0` to disable retry.
- and all [redis client options](https://github.com/mranney/node_redis#rediscreateclient).

@@ -77,3 +79,3 @@

queue.add({ a: 1, b: 1 }, { retry: 1, timeout: 120 }).then(function(job) {
console.log(job); // { a: 1, b: 1 }
console.log(job.data); // { a: 1, b: 1 }
});

@@ -84,4 +86,4 @@ ```

- `retry`: default to `0`
- `timeout`: default to `60` seconds
- `retry`: set initial retry counter, default to `0`
- `timeout`: set worker timeout in seconds, default to `60`

@@ -92,4 +94,4 @@ ### job

- `data`: payload
- `retry`: retry count
- `timeout`: worker timeout (not currently used)
- `retry`: current retry count for this job
- `timeout`: how many seconds a worker can run before it's terminated.

@@ -99,3 +101,3 @@

Register a handler function that process jobs, and start processing items in queue.
Register a handler function that process jobs, and start processing jobs in queue.

@@ -105,8 +107,19 @@ ### examples

```
queue.worker(function(job) {
console.log(job);
queue.worker(function(job, done) {
// ... do actual work
done();
});
```
### done(err);
Must be called to signal the completion of job processing.
If called with an instance of `Error`, then `decent` will assume worker failed to process this job.
Fail jobs are moved back to work queue when they are below retry threshold, otherwise they are moved to failure queue.
## queue.count(name)

@@ -120,11 +133,11 @@

queue.count('work').then(function(count) {
console.log(count); // pending jobs
console.log(count); // pending job count
});
queue.count('run').then(function(count) {
console.log(count); // running jobs
console.log(count); // running job count
});
queue.count('fail').then(function(count) {
console.log(count); // failed jobs
console.log(count); // failed job count
});

@@ -147,19 +160,55 @@ ```

## queue.remove(id)
Returns a promise that will resolve when job is removed from redis (both job data and job queue).
### examples
```
queue.remove(1).then(function() {
// ...
});
```
## queue.stop()
Instructs queue worker to terminate gracefully on next loop. See events on how to monitor queue.
### examples
```
queue.stop();
```
## queue.restart()
Restarts the queue worker loop. See events on how to monitor queue.
### examples
```
queue.restart();
```
# Events
`decent` is also an instance of `EventEmitter`, so you can use `queue.on('event', func)` to listen for events as usual.
`decent` is an instance of `EventEmitter`, so you can use `queue.on('event', func)` as usual.
## redis client related
## Redis client related
- `queue.emit('client ready')`: client is ready. (redis client has buffer built-in, so this event is emitted as soon as redis client is started)
- `queue.emit('client error', err)`: client connection error.
- `queue.emit('client ready')`: client is ready. (redis client has buffer built-in, so this event is emitted as soon as redis client is started.)
- `queue.emit('client error', err)`: client connection experiences error.
- `queue.emit('client close')`: client connection has been closed.
- `queue.emit('client pressure', number)`: pending number of commands to run on server, useful for rate limiting.
- `queue.emit('client pressure', number)`: pending number of commands, useful for rate limiting.
## queue worker related
- `queue.emit('queue start')`: queue loop has started.
- `queue.emit('queue ok', job)`: queue worker has processed a `job`.
- `queue.emit('queue error', err)`: queue worker has failed to processed a job and thrown `err`.
- `queue.emit('queue exit', err)`: queue has terminated due to `err`.
- `queue.emit('queue stop')`: queue has stopped gracefully.
- `queue.emit('queue error', err)`: queue worker has failed to processed a job and thrown `err` (caught properly, so queue does not exit)
- `queue.emit('queue exit', err)`: queue loop has terminated due to `err`.
- `queue.emit('queue stop')`: queue loop has stopped gracefully.

@@ -179,8 +228,12 @@ ## queue client related

Feel feel to raise any issues or feature requests, note that we do intend to keep this API simple, and all changes must be well-tested.
# Future plan
- API for handling failed jobs
- API for re-queueing failed jobs
- Use-case examples
- Web UI
# License

@@ -187,0 +240,0 @@

@@ -17,3 +17,2 @@

var Promise = require('native-or-bluebird');
var Redis = require('fakeredis');

@@ -267,2 +266,14 @@ // global vars

it('should emit event on start', function() {
var stub = sinon.stub(queue, 'run');
stub.returns(Promise.resolve(true));
var spy = sinon.spy();
queue.on('queue start', spy);
return queue.listen().then(function() {
expect(spy).to.have.been.calledOnce;
});
});
it('should emit event on error exit', function() {

@@ -364,3 +375,3 @@ var error = new Error('some error');

describe('handleJob', function() {
it('should throw error if handler is missing', function() {
it('should reject if handler is missing', function() {
var job = {

@@ -376,6 +387,12 @@ id: 1

expect(function() { queue.handleJob(job) }).to.throw(Error);
var spy = sinon.spy();
queue.on('queue error', spy);
return queue.handleJob(job).then(function() {
expect(spy).to.have.been.calledOnce;
expect(spy.args[0][0]).to.be.an.instanceof(Error);
});
});
it('should run handler to process job', function() {
it('should support job timeout', function() {
var job = {

@@ -385,3 +402,5 @@ id: 1

, retry: 0
, timeout: 60
// don't do this, only integer are supported
// this is to fake timeout
, timeout: 0.01
};

@@ -392,12 +411,19 @@

queue.handler = function(job, done) { done() };
var stub = sinon.stub(queue, 'handler', function(job, done) {
setTimeout(function() {
done();
}, 50);
});
var spy = sinon.spy();
queue.handler = spy;
queue.on('queue error', spy);
return queue.handleJob(job).then(function() {
expect(spy).to.have.been.calledOnce;
expect(spy).to.have.been.calledWith(job);
expect(spy.args[0][0]).to.be.an.instanceof(Error);
});
});
it('should emit job done event', function() {
it('should support disabling job timeout', function() {
var job = {

@@ -407,3 +433,3 @@ id: 1

, retry: 0
, timeout: 60
, timeout: 0
};

@@ -414,3 +440,8 @@

queue.handler = function() {};
queue.handler = function(job, done) { done() };
var stub = sinon.stub(queue, 'handler', function(job, done) {
setTimeout(function() {
done();
}, 50);
});

@@ -421,8 +452,8 @@ var spy = sinon.spy();

return queue.handleJob(job).then(function() {
expect(stub).to.have.been.calledOnce;
expect(spy).to.have.been.calledOnce;
expect(spy).to.have.been.calledWith(job);
});
});
it('should move job to another queue if handler throw error', function() {
it('should catch handler error', function() {
var job = {

@@ -432,2 +463,27 @@ id: 1

, retry: 0
, timeout: 0
};
queue.client.hmset(queue.prefix + ':' + job.id, queue.toClient(job));
queue.client.lpush(queue.runQueue, job.id);
queue.handler = function(job, done) {
nonexist();
done();
};
var spy = sinon.spy();
queue.on('queue error', spy);
return queue.handleJob(job).then(function() {
expect(spy).to.have.been.calledOnce;
expect(spy.args[0][0]).to.be.an.instanceof(Error);
});
});
it('should allow worker function to trigger error', function() {
var job = {
id: 1
, data: { a: 1 }
, retry: 0
, timeout: 60

@@ -439,18 +495,63 @@ };

queue.handler = function(job, done) { done() };
var error = new Error('some error');
var s0 = sinon.stub();
var s1 = sinon.stub(queue, 'moveJob');
s0.throws(error);
s1.returns(Promise.resolve(true));
var stub = sinon.stub(queue, 'handler', function(job, done) {
setTimeout(function() {
done(error);
}, 25);
});
queue.handler = s0;
var spy = sinon.spy();
queue.on('queue error', spy);
return queue.handleJob(job).then(function() {
expect(s0).to.have.been.calledOnce;
expect(s0).to.have.been.calledWith(job);
expect(s1).to.have.been.calledOnce;
expect(s1).to.have.been.calledWith(job);
expect(spy).to.have.been.calledOnce;
expect(spy).to.have.been.calledWith(error);
});
});
it('should run handler to process job', function() {
var job = {
id: 1
, data: { a: 1 }
, retry: 0
, timeout: 60
};
queue.client.hmset(queue.prefix + ':' + job.id, queue.toClient(job));
queue.client.lpush(queue.runQueue, job.id);
queue.handler = function(job, done) { done() };
var stub = sinon.stub(queue, 'handler', function(job, done) {
done();
});
return queue.handleJob(job).then(function() {
expect(stub).to.have.been.calledOnce;
expect(stub).to.have.been.calledWith(job);
});
});
it('should emit job done event', function() {
var job = {
id: 1
, data: { a: 1 }
, retry: 0
, timeout: 60
};
queue.client.hmset(queue.prefix + ':' + job.id, queue.toClient(job));
queue.client.lpush(queue.runQueue, job.id);
queue.handler = function(job, done) { done() };
var spy = sinon.spy();
queue.on('queue ok', spy);
return queue.handleJob(job).then(function() {
expect(spy).to.have.been.calledOnce;
expect(spy).to.have.been.calledWith(job);
});
});
it('should emit error from handler', function() {

@@ -467,10 +568,14 @@ var job = {

queue.handler = function(job, done) { done() };
var error = new Error('some error');
var s0 = sinon.stub();
var s0 = sinon.stub(queue, 'handler', function(job, done) {
setTimeout(function() {
done(error);
}, 25);
});
var s1 = sinon.stub(queue, 'moveJob');
var spy = sinon.spy();
s0.throws(error);
s1.returns(Promise.resolve(true));
queue.handler = s0;
var spy = sinon.spy();
queue.on('queue error', spy);

@@ -483,2 +588,31 @@

});
it('should move job to another queue if handler throw error', function() {
var job = {
id: 1
, data: { a: 1 }
, retry: 0
, timeout: 60
};
queue.client.hmset(queue.prefix + ':' + job.id, queue.toClient(job));
queue.client.lpush(queue.runQueue, job.id);
queue.handler = function(job, done) { done() };
var s0 = sinon.stub(queue, 'handler', function(job, done) {
setTimeout(function() {
done(new Error('some error'));
}, 25);
});
var s1 = sinon.stub(queue, 'moveJob');
s1.returns(Promise.resolve(true));
return queue.handleJob(job).then(function() {
expect(s0).to.have.been.calledOnce;
expect(s0).to.have.been.calledWith(job);
expect(s1).to.have.been.calledOnce;
expect(s1).to.have.been.calledWith(job);
});
});
});

@@ -558,3 +692,3 @@

it('should reject if move command return unexpected result', function() {
it('should reject if job id is missing', function() {
var job = {

@@ -569,2 +703,16 @@ id: 1

});
it('should reject if job data is partial', function() {
var job = {
id: 1
, data: '{"a":1}'
//, retry: 0
, timeout: 60
};
queue.client.hmset(queue.prefix + ':' + job.id, job);
queue.client.lpush(queue.runQueue, job.id);
return expect(queue.moveJob(job)).to.eventually.be.rejectedWith(Error);
});
});

@@ -609,3 +757,3 @@

it('should reject if remove command return unexpected result', function() {
it('should reject if job id is missing', function() {
var job = {

@@ -620,2 +768,15 @@ id: 1

});
it('should reject if job data is missing', function() {
var job = {
id: 1
, data: { a: 1 }
, retry: 0
, timeout: 60
};
queue.client.lpush(queue.runQueue, job.id);
return expect(queue.remove(job.id)).to.eventually.be.rejectedWith(Error);
});
});

@@ -630,2 +791,10 @@

describe('restart', function() {
it('should restart listener', function() {
var stub = sinon.stub(queue, 'listen');
queue.restart();
expect(stub).to.have.been.calledOnce;
});
});
describe('count', function() {

@@ -799,2 +968,4 @@ it('should return work queue job count by default', function() {

var p = queue.add({ a: 1 });
// TODO: trigger exec error handling, currently the error is from redis client
queue.client.stream.destroy();

@@ -884,9 +1055,14 @@

describe('real world', function() {
it('should process async jobs', function(done) {
var handler = sinon.spy();
describe('use case', function() {
it('should process jobs async', function(testEnd) {
var handler = function(job, done) {
setTimeout(function() {
done();
}, 25);
};
queue.worker(handler);
var spy = sinon.spy();
queue.on('queue ok', spy);
var s1 = sinon.spy(queue, 'handler');
var s2 = sinon.spy();
queue.on('queue ok', s2);

@@ -898,13 +1074,12 @@ setTimeout(function() {

queue.on('queue ok', function(job) {
expect(handler).to.have.been.calledOnce;
expect(spy).to.have.been.calledOnce;
expect(s1).to.have.been.calledOnce;
expect(s2).to.have.been.calledOnce;
expect(job.data).to.have.property('a', 1);
queue.removeAllListeners('queue ok');
queue.stop();
done();
testEnd();
});
});
it('should allow parallebl job creation', function() {
it('should allow parallel job creation', function() {
var p = Promise.all([

@@ -928,7 +1103,7 @@ queue.add({ a: 1 })

describe('wrap up', function() {
it('should not leave test data in redis', function(done) {
it('should not leave test data in redis', function(testEnd) {
queue.client.keys(queue.prefix + ':*', function(err, res) {
expect(err).to.be.null;
expect(res).to.be.empty;
done();
testEnd();
});

@@ -935,0 +1110,0 @@ });

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc