Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

bee-queue

Package Overview
Dependencies
Maintainers
1
Versions
19
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

bee-queue - npm Package Compare versions

Comparing version 0.1.0 to 0.2.0

bee-queue.png

26

examples/example.js
var Queue = require('../index');
var queue = new Queue('test');
queue.on('ready', function () {
queue.process(function (job, done) {
console.log('processing job ' + job.jobId);
console.log('the sum is: ' + (job.data.x + job.data.y));
done();
});
var reportResult = function (result) {
console.log('Result received: ' + result);
};
var reportEnqueued = function (err, job) {
console.log('enqueued job ' + job.jobId);
};
queue.process(function (job, done) {
console.log('Processing job ' + job.id);
return done(null, job.data.x + job.data.y);
});
queue.add({x: 1, y: 1}, reportEnqueued);
queue.add({x: 1, y: 2}, reportEnqueued);
setTimeout(queue.add.bind(queue, {x: 1, y: 3}, reportEnqueued), 500);
var job1 = queue.createJob({x: 1, y: 2}).save();
var job2 = queue.createJob({x: 2, y: 3}).save(function (err, job) {
console.log('Saved job ' + job.id);
});
job1.on('succeeded', reportResult);
job2.on('succeeded', reportResult);
module.exports = {
stalledInterval: 5000,
lockTimeout: 5000,
globalKeyPrefix: 'bq'
stallInterval: 5000,
prefix: 'bq',
isWorker: true,
getEvents: true,
sendEvents: true,
removeOnSuccess: false,
catchExceptions: false
};

@@ -11,4 +11,11 @@ // Effectively _.after

var defaultCb = function (err) {
if (err) {
throw err;
}
};
module.exports = {
barrier: barrier
barrier: barrier,
defaultCb: defaultCb
};

@@ -0,58 +1,97 @@

var events = require('events');
var util = require('util');
var helpers = require('./helpers');
var lua = require('./lua');
function Job(queue, jobId, data) {
function Job(queue, jobId, data, options) {
this.queue = queue;
this.jobId = jobId;
this.id = jobId;
this.progress = 0;
this.data = data || {};
this.lockKey = this.queue.toKey(this.jobId) + ':lock';
this.options = options || {};
this.status = 'created';
}
util.inherits(Job, events.EventEmitter);
Job.fromId = function (queue, jobId, cb) {
queue.client.get(queue.toKey(jobId), function (err, data) {
if (err) {
return cb(err);
}
queue.client.hget(queue.toKey('jobs'), jobId, function (err, data) {
/* istanbul ignore if */
if (err) return cb(err);
return cb(null, Job.fromData(queue, jobId, data));
});
};
try {
data = JSON.parse(data);
} catch (e) {
return cb(e);
}
Job.fromData = function (queue, jobId, data) {
// no need for try-catch here since we made the JSON ourselves in job#toData
data = JSON.parse(data);
var job = new Job(queue, jobId, data.data, data.options);
job.status = data.status;
return job;
};
return cb(null, new Job(queue, jobId, data));
Job.prototype.toData = function () {
return JSON.stringify({
data: this.data,
options: this.options,
status: this.status
});
};
var handleLock = function (cb) {
return function (err, result) {
if (err) {
return cb(err);
Job.prototype.save = function (cb) {
cb = cb || helpers.defaultCb;
var self = this;
this.queue.client.evalsha(lua.shas.addJob, 3,
this.queue.toKey('id'), this.queue.toKey('jobs'), this.queue.toKey('waiting'),
this.toData(),
function (err, jobId) {
/* istanbul ignore if */
if (err) return cb(err);
self.id = jobId;
self.queue.jobs[jobId] = self;
return cb(null, self);
}
return cb(null, result === 'OK');
};
);
return this;
};
Job.prototype.acquireLock = function (cb) {
this.queue.client.set(this.lockKey, this.queue.token, 'PX', this.queue.lockTimeout, 'NX', handleLock(cb));
Job.prototype.retries = function (n) {
if (n < 0) {
throw Error('Retries cannot be negative');
}
this.options.retries = n;
return this;
};
Job.prototype.renewLock = function (cb) {
this.queue.client.set(this.lockKey, this.queue.token, 'PX', this.queue.lockTimeout, handleLock(cb));
Job.prototype.timeout = function (ms) {
if (ms < 0) {
throw Error('Timeout cannot be negative');
}
this.options.timeout = ms;
return this;
};
Job.prototype.releaseLock = function (cb) {
this.queue.client.evalsha(lua.shas.releaseLock, 1, this.lockKey, this.queue.token, function (err, result) {
if (cb) {
if (err) {
return cb(err);
}
return cb(null, result === 1);
}
});
Job.prototype.reportProgress = function (progress, cb) {
// right now we just send the pubsub event
// might consider also updating the job hash for persistence
cb = cb || helpers.defaultCb;
progress = Number(progress);
if (progress < 0 || progress > 100) {
return process.nextTick(cb.bind(null, Error('Progress must be between 0 and 100')));
}
this.progress = progress;
this.queue.client.publish(this.queue.toKey('events'), JSON.stringify({
id: this.id,
event: 'progress',
data: progress
}), cb);
};
Job.prototype.remove = function (cb) {
this.queue.client.evalsha(lua.shas.removeJob, 1,
this.jobId,
this.queue.toKey(''),
cb = cb || helpers.defaultCb;
this.queue.client.evalsha(lua.shas.removeJob, 6,
this.queue.toKey('succeeded'), this.queue.toKey('failed'), this.queue.toKey('waiting'),
this.queue.toKey('active'), this.queue.toKey('stalling'), this.queue.toKey('jobs'),
this.id,
cb

@@ -63,20 +102,13 @@ );

Job.prototype.retry = function (cb) {
cb = cb || helpers.defaultCb;
this.queue.client.multi()
.srem(this.queue.toKey('failed'), this.jobId)
.lpush(this.queue.toKey('wait'), this.jobId)
.srem(this.queue.toKey('failed'), this.id)
.lpush(this.queue.toKey('waiting'), this.id)
.exec(cb);
};
Job.prototype.moveToSet = function (set, cb) {
this.queue.client.multi()
.lrem(this.queue.toKey('active'), 0, this.jobId)
.sadd(this.queue.toKey(set), this.jobId)
.exec(cb);
};
Job.prototype.isInSet = function (set, cb) {
this.queue.client.sismember(this.queue.toKey(set), this.jobId, function (err, result) {
if (err) {
return cb(err);
}
this.queue.client.sismember(this.queue.toKey(set), this.id, function (err, result) {
/* istanbul ignore if */
if (err) return cb(err);
return cb(null, result === 1);

@@ -83,0 +115,0 @@ });

var fs = require('fs');
var crypto = require('crypto');
var path = require('path');
var barrier = require('../helpers').barrier;

@@ -7,14 +8,18 @@

var shas = {};
var scriptsRead = false;
var cachedServers = {};
fs.readdirSync('./lib/lua')
.filter(function (file) {
return file.slice(-4) === '.lua';
}).forEach(function (file) {
var hash = crypto.createHash('sha1');
var key = file.slice(0, -4);
scripts[key] = fs.readFileSync('./lib/lua/' + file).toString();
hash.update(scripts[key]);
shas[key] = hash.digest('hex');
});
var readScripts = function () {
fs.readdirSync(__dirname)
.filter(function (file) {
return file.slice(-4) === '.lua';
}).forEach(function (file) {
var hash = crypto.createHash('sha1');
var key = file.slice(0, -4);
scripts[key] = fs.readFileSync(path.join(__dirname, file)).toString();
hash.update(scripts[key]);
shas[key] = hash.digest('hex');
});
scriptsRead = true;
};

@@ -26,2 +31,6 @@ var buildCache = function (serverKey, client, cb) {

if (!scriptsRead) {
readScripts();
}
var reportLoaded = barrier(Object.keys(shas).length, function () {

@@ -32,6 +41,8 @@ cachedServers[serverKey] = true;

// todo: could theoretically pipeline this, but it's pretty insignificant
Object.keys(shas).forEach(function (key) {
client.script('exists', shas[key], function (err, exists) {
/* istanbul ignore if */
if (err) {
throw Error('Could not build Lua script cache');
client.emit('error', 'Could not build Lua script cache');
} else if (exists[0] === 0) {

@@ -38,0 +49,0 @@ client.script('load', scripts[key], reportLoaded);

var redis = require('redis');
var events = require('events');
var util = require('util');
var crypto = require('crypto');
var barrier = require('./helpers').barrier;
var Job = require('./job');
var defaults = require('./defaults');
var lua = require('./lua');
var Job = require('./job');
var helpers = require('./helpers');
var barrier = helpers.barrier;

@@ -17,34 +16,59 @@ function Queue(name, settings) {

this.name = name;
this.paused = false;
this.jobs = {};
settings = settings || {};
settings.options = settings.options || {};
settings.db = settings.db || 0;
this.settings = {
redis: settings.redis || {},
stallInterval: typeof settings.stallInterval === 'number' ?
settings.stallInterval :
defaults.stallInterval,
keyPrefix: (settings.prefix || defaults.prefix) + ':' + this.name + ':'
};
var connParams;
if (settings.socket) {
connParams = [settings.socket, settings.options];
var boolProps = ['isWorker', 'getEvents', 'sendEvents', 'removeOnSuccess', 'catchExceptions'];
boolProps.forEach(function (prop) {
this.settings[prop] = typeof settings[prop] === 'boolean' ? settings[prop] : defaults[prop];
}.bind(this));
/* istanbul ignore if */
if (this.settings.redis.socket) {
this.settings.redis.params = [this.settings.redis.socket, this.settings.redis.options];
} else {
settings.port = settings.port || 6379;
settings.host = settings.host || '127.0.0.1';
connParams = [settings.port, settings.host, settings.options];
this.settings.redis.port = this.settings.redis.port || 6379;
this.settings.redis.host = this.settings.redis.host || '127.0.0.1';
this.settings.redis.params = [
this.settings.redis.port, this.settings.redis.host, this.settings.redis.options
];
}
this.settings.redis.db = this.settings.redis.db || 0;
this.name = name;
this.paused = false;
this.token = crypto.pseudoRandomBytes(16).toString('hex');
this.lockTimeout = settings.lockTimeout || defaults.lockTimeout;
this.globalKeyPrefix = settings.globalKeyPrefix || defaults.globalKeyPrefix;
this.keyPrefix = this.globalKeyPrefix + ':' + this.name + ':';
this.catchExceptions = settings.catchExceptions || false;
// Wait for Lua loading and client connection; bclient and eclient/subscribe if needed
var reportReady = barrier(
2 + this.settings.isWorker + this.settings.getEvents * 2,
this.emit.bind(this, 'ready')
);
this.client = redis.createClient.apply(redis, connParams);
this.bclient = redis.createClient.apply(redis, connParams);
var makeClient = function (clientName) {
this[clientName] = redis.createClient.apply(redis, this.settings.redis.params);
this[clientName].on('error', this.emit.bind(this, 'error'));
this[clientName].select(this.settings.redis.db, reportReady);
}.bind(this);
this.client.on('error', this.emit.bind(this, 'error'));
this.bclient.on('error', this.emit.bind(this, 'error'));
makeClient('client');
var serverKey = settings.socket || settings.host + ':' + settings.port;
var reportReady = barrier(3, this.emit.bind(this, 'ready'));
lua.buildCache(serverKey, this.client, reportReady);
this.client.select(settings.db, reportReady);
this.bclient.select(settings.db, reportReady);
if (this.settings.isWorker) {
makeClient('bclient');
}
if (this.settings.getEvents) {
makeClient('eclient');
this.eclient.subscribe(this.toKey('events'));
this.eclient.on('message', this.onMessage.bind(this));
this.eclient.on('subscribe', reportReady);
}
this.settings.serverKey = this.settings.redis.socket || this.settings.redis.host + ':' + this.settings.redis.port;
lua.buildCache(this.settings.serverKey, this.client, reportReady);
}

@@ -54,5 +78,30 @@

Queue.prototype.onMessage = function (channel, message) {
message = JSON.parse(message);
if (message.event === 'failed' || message.event === 'retrying') {
message.data = Error(message.data);
}
this.emit('job ' + message.event, message.id, message.data);
if (this.jobs[message.id]) {
if (message.event === 'progress') {
this.jobs[message.id].progress = message.data;
} else if (message.event === 'retrying') {
this.jobs[message.id].options.retries -= 1;
}
this.jobs[message.id].emit(message.event, message.data);
if (message.event === 'succeeded' || message.event === 'failed') {
delete this.jobs[message.id];
}
}
};
Queue.prototype.close = function (cb) {
cb = cb || helpers.defaultCb;
this.paused = true;
/* istanbul ignore next */
var closeTimeout = setTimeout(function () {

@@ -62,7 +111,17 @@ return cb(Error('Timed out closing redis connections'));

var handleEnd = barrier(2, function () {
var clients = [this.client];
if (this.settings.isWorker) {
clients.push(this.bclient);
}
if (this.settings.getEvents) {
clients.push(this.eclient);
}
var handleEnd = barrier(clients.length, function () {
clearTimeout(closeTimeout);
return cb(null);
});
[this.client, this.bclient].forEach(function (client) {
clients.forEach(function (client) {
client.end();

@@ -73,37 +132,51 @@ client.stream.on('close', handleEnd);

Queue.prototype.empty = function (cb) {
return this.client.evalsha(lua.shas.emptyQueue, 1, 'keyprefix', this.toKey(''), cb);
Queue.prototype.destroy = function (cb) {
cb = cb || helpers.defaultCb;
var keys = ['id', 'jobs', 'stallTime', 'stalling', 'waiting', 'active', 'succeeded', 'failed']
.map(this.toKey.bind(this));
this.client.del.apply(this.client, keys.concat(cb));
};
Queue.prototype.add = function (data, cb) {
var self = this;
this.client.evalsha(lua.shas.addJob, 1, this.toKey(''), JSON.stringify(data), function (err, jobId) {
if (cb) {
if (err) {
return cb(err);
}
return cb(null, new Job(self, jobId, data));
}
});
Queue.prototype.checkHealth = function (cb) {
this.client.multi()
.llen(this.toKey('waiting'))
.llen(this.toKey('active'))
.scard(this.toKey('succeeded'))
.scard(this.toKey('failed'))
.exec(function (err, results) {
/* istanbul ignore if */
if (err) return cb(err);
return cb(null, {
waiting: results[0],
active: results[1],
succeeded: results[2],
failed: results[3]
});
});
};
Queue.prototype.getNextJob = function (cb) {
Queue.prototype.createJob = function (data) {
return new Job(this, null, data);
};
Queue.prototype.getJob = function (jobId, cb) {
var self = this;
this.bclient.brpoplpush(this.toKey('wait'), this.toKey('active'), 0, function (err, jobId) {
if (err) {
return cb(err);
}
// todo maybe acquireLock here instead of waiting for runJob to renew it
// could narrow or even eliminate theoretical RC window
return Job.fromId(self, jobId, cb);
});
if (jobId in this.jobs) {
return process.nextTick(cb.bind(null, null, this.jobs[jobId]));
} else {
Job.fromId(this, jobId, function (err, job) {
/* istanbul ignore if */
if (err) return cb(err);
self.jobs[jobId] = job;
return cb(err, job);
});
}
};
Queue.prototype.runNextJob = function (cb) {
Queue.prototype.getNextJob = function (cb) {
var self = this;
this.getNextJob(function (err, job) {
if (err) {
return cb(err);
}
return self.runJob(job, cb);
this.bclient.brpoplpush(this.toKey('waiting'), this.toKey('active'), 0, function (err, jobId) {
/* istanbul ignore if */
if (err) return cb(err);
return Job.fromId(self, Number(jobId), cb);
});

@@ -114,36 +187,32 @@ };

var self = this;
var renewTimeout;
var psTimeout;
var handled = false;
var renewLock = function () {
job.renewLock(function () {
renewTimeout = setTimeout(renewLock, self.lockTimeout / 2);
var preventStalling = function () {
self.client.srem(self.toKey('stalling'), job.id, function () {
if (!handled) {
psTimeout = setTimeout(preventStalling, self.settings.stallInterval / 2);
}
});
};
renewLock();
preventStalling();
var handleOutcome = function (err, data) {
// todo somehow disallow calling this more than once
if (err) {
job.moveToSet('failed', function (errMove) {
clearTimeout(renewTimeout);
job.releaseLock();
if (errMove) {
return cb(errMove);
}
self.emit('failed', job, err);
return cb(err);
});
} else {
job.moveToSet('succeeded', function (errMove) {
clearTimeout(renewTimeout);
if (errMove) {
return cb(errMove);
}
self.emit('succeeded', job, data);
return cb();
});
// silently ignore any multiple calls
if (handled) {
return;
}
handled = true;
clearTimeout(psTimeout);
self.finishJob(err, data, job, cb);
};
if (this.catchExceptions) {
if (job.options.timeout) {
var msg = 'Job ' + job.id + ' timed out (' + job.options.timeout + ' ms)';
setTimeout(handleOutcome.bind(null, Error(msg)), job.options.timeout);
}
if (this.settings.catchExceptions) {
try {

@@ -159,3 +228,50 @@ this.handler(job, handleOutcome);

Queue.prototype.process = function (maxRunning, handler) {
Queue.prototype.finishJob = function (err, data, job, cb) {
var status = err ? 'failed' : 'succeeded';
var multi = this.client.multi()
.lrem(this.toKey('active'), 0, job.id)
.srem(this.toKey('stalling'), job.id);
var jobEvent = {
id: job.id,
event: status,
data: err ? err.message : data
};
if (status === 'failed') {
if (job.options.retries > 0) {
job.options.retries -= 1;
jobEvent.event = 'retrying';
multi.hset(this.toKey('jobs'), job.id, job.toData())
.lpush(this.toKey('waiting'), job.id);
} else {
multi.sadd(this.toKey('failed'), job.id);
}
} else {
if (this.settings.removeOnSuccess) {
multi.hdel(this.toKey('jobs'), job.id);
} else {
multi.sadd(this.toKey('succeeded'), job.id);
}
}
if (this.settings.sendEvents) {
multi.publish(this.toKey('events'), JSON.stringify(jobEvent));
}
multi.exec(function (errMulti) {
/* istanbul ignore if */
if (errMulti) {
return cb(errMulti);
}
return cb(null, status, err ? err : data);
});
};
Queue.prototype.process = function (concurrency, handler) {
if (!this.settings.isWorker) {
throw Error('Cannot call Queue.prototype.process on a non-worker');
}
if (this.handler) {

@@ -165,5 +281,5 @@ throw Error('Cannot call Queue.prototype.process twice');

if (typeof maxRunning === 'function') {
handler = maxRunning;
maxRunning = 1;
if (typeof concurrency === 'function') {
handler = concurrency;
concurrency = 1;
}

@@ -175,16 +291,16 @@

this.queued = 1;
this.maxRunning = maxRunning || 1;
this.concurrency = concurrency;
var jobTick = function () {
if (self.paused) {
self.queued -= 1;
return;
}
// invariant: in this code path, self.running < self.maxRunning, always
// after spoolup, self.running + self.queued === self.maxRunning
self.getNextJob(function (err, job) {
// todo decide how best to handle these two error cases
if (err) {
console.log('getNextJob failed: ', err);
// invariant: in this code path, self.running < self.concurrency, always
// after spoolup, self.running + self.queued === self.concurrency
self.getNextJob(function (getErr, job) {
/* istanbul ignore if */
if (getErr) {
self.emit('error', getErr);
return setImmediate(jobTick);

@@ -195,3 +311,3 @@ }

self.queued -= 1;
if (self.running + self.queued < self.maxRunning) {
if (self.running + self.queued < self.concurrency) {
self.queued += 1;

@@ -201,8 +317,13 @@ setImmediate(jobTick);

self.runJob(job, function (errRun) {
if (errRun) {
console.log('runJob failed: ', errRun);
}
self.runJob(job, function (err, status, result) {
self.running -= 1;
self.queued += 1;
/* istanbul ignore if */
if (err) {
self.emit('error', err);
} else {
self.emit(status, job, result);
}
setImmediate(jobTick);

@@ -214,2 +335,3 @@ });

var restartProcessing = function () {
// maybe need to increment queued here?
self.bclient.once('ready', jobTick);

@@ -220,13 +342,28 @@ };

this.resetStalledJobs(setImmediate.bind(null, jobTick));
this.checkStalledJobs(setImmediate.bind(null, jobTick));
};
Queue.prototype.resetStalledJobs = function (cb) {
this.client.evalsha(lua.shas.resetStalledJobs, 1, '', this.toKey(''), cb);
Queue.prototype.checkStalledJobs = function (interval, cb) {
var self = this;
cb = typeof interval === 'function' ? interval : cb || helpers.defaultCb;
this.client.evalsha(lua.shas.checkStalledJobs, 4,
this.toKey('stallTime'), this.toKey('stalling'), this.toKey('waiting'), this.toKey('active'),
Date.now(), this.settings.stallInterval, function (err) {
/* istanbul ignore if */
if (err) return cb(err);
if (typeof interval === 'number') {
setTimeout(self.checkStalledJobs.bind(self, interval, cb), interval);
}
return cb();
}
);
};
Queue.prototype.toKey = function (str) {
return this.keyPrefix + str;
return this.settings.keyPrefix + str;
};
module.exports = Queue;
{
"name": "bee-queue",
"version": "0.1.0",
"version": "0.2.0",
"description": "A simple, fast, robust job/task queue, backed by Redis.",
"main": "index.js",
"dependencies": {
"redis": "^0.12.1"
"redis": "0.12.1"
},

@@ -12,6 +12,7 @@ "devDependencies": {

"grunt": "~0.4.5",
"grunt-eslint": "^3.0.0",
"grunt-eslint": "^14.0.0",
"grunt-githooks": "^0.3.1",
"grunt-mocha-test": "^0.12.6",
"sinon": "^1.12.2"
"sinon": "^1.12.2",
"coveralls": "^2.11.2"
},

@@ -25,2 +26,5 @@ "scripts": {

"queue",
"worker",
"distributed",
"system",
"redis",

@@ -27,0 +31,0 @@ "lua"

@@ -1,78 +0,510 @@

# Bee Queue
<a name="top"></a>
![bee-queue logo](https://raw.githubusercontent.com/LewisJEllis/bee-queue/master/bee-queue.png)
[![NPM Version][npm-image]][npm-url] [![Build Status][travis-image]][travis-url] [![Coverage Status][coveralls-image]][coveralls-url]
A simple, fast, robust job/task queue, backed by Redis.
A simple, fast, robust job/task queue for Node.js, backed by Redis.
- Simple: ~500 LOC, and the only dependency is [node_redis](https://github.com/mranney/node_redis).
- Fast: maximizes throughput by minimizing Redis and network overhead. [Benchmarks](#benchmarks) well.
- Robust: designed with concurrency, atomicity, and failure in mind; 100% code coverage.
[![NPM Version][npm-image]][npm-url]
[![Build Status][travis-image]][travis-url]
```javascript
var Queue = require('bee-queue');
var queue = new Queue('example');
var job = queue.createJob({x: 2, y: 3}).save();
job.on('succeeded', function (result) {
console.log('Received result for job ' + job.id + ': ' + result);
});
// Process jobs from as many servers or processes as you like
queue.process(function (job, done) {
console.log('Processing job ' + job.id);
return done(null, job.data.x + job.data.y);
});
```
## Introduction
Bee-Queue is meant to power a distributed worker pool and was built with short, real-time jobs in mind. A web server can enqueue a job, wait for a worker process to complete it, and return its results within an HTTP request. Scaling is as simple as running more workers.
[Celery](http://www.celeryproject.org/), [Resque](https://github.com/resque/resque), [Kue](https://github.com/LearnBoost/kue), and [Bull](https://github.com/OptimalBits/bull) operate similarly, but are generally designed for longer background jobs, supporting things like job scheduling and prioritization, which Bee-Queue [currently does not](#missing-features). Bee-Queue can handle longer background jobs just fine, but they aren't [the primary focus](#motivation).
- Create, save, and process jobs
- Concurrent processing
- Job timeouts and retries
- Pass events via Pub/Sub
- Progress reporting
- Send job results back to creators
- Robust design
- Strives for all atomic operations
- Retries [stuck jobs](#under-the-hood)
- 100% code coverage
- Performance-focused
- Keeps [Redis usage](#under-the-hood) to the bare minimum
- Uses [Lua scripting](http://redis.io/commands/EVAL) and [pipelining](http://redis.io/topics/pipelining) to minimize network overhead
- [Benchmarks](#benchmarks) favorably against similar libraries
## Installation
```
npm install bee-queue
```
You'll also need [Redis 2.8+](http://redis.io/topics/quickstart) running somewhere.
# Table of Contents
- [Motivation](#motivation)
- [Benchmarks](#benchmarks)
- [Overview](#overview)
- [Creating Queues](#creating-queues)
- [Creating Jobs](#creating-jobs)
- [Processing Jobs](#processing-jobs)
- [Progress Reporting](#progress-reporting)
- [Job & Queue Events](#job-and-queue-events)
- [Stalled Jobs](#stalled-jobs)
- [API Reference](#api-reference)
- [Under The Hood](#under-the-hood)
- [Contributing](#contributing)
- [License](https://github.com/LewisJEllis/bee-queue/blob/master/LICENSE)
# Motivation
Celery is for Python, and Resque is for Ruby, but [Kue](https://github.com/LearnBoost/kue) and [Bull](https://github.com/OptimalBits/bull) already exist for Node, and they're good at what they do, so why does Bee-Queue also need to exist?
In short: we needed to mix and match things that Kue does well with things that Bull does well, and we needed to squeeze out more performance. There's also a [long version](https://github.com/LewisJEllis/bee-queue/wiki/Origin) with more details.
Bee-Queue starts by combining Bull's simplicity and robustness with Kue's ability to send events back to job creators, then focuses heavily on minimizing overhead, and finishes by being strict about [code quality](https://github.com/LewisJEllis/bee-queue/blob/master/.eslintrc) and [testing](https://coveralls.io/r/LewisJEllis/bee-queue?branch=master). It compromises on breadth of features, so there are certainly cases where Kue or Bull might be preferable (see [Contributing](#contributing)).
Bull and Kue do things really well and deserve a lot of credit. Bee-Queue borrows ideas from both, and Bull was an especially invaluable reference during initial development.
#### Why Bees?
Bee-Queue is like a bee because it:
- is small and simple
- is fast (bees can fly 20mph!)
- carries pollen (messages) between flowers (servers)
- something something "worker bees"
# Benchmarks
![benchmark chart](https://raw.githubusercontent.com/LewisJEllis/bee-queue/master/benchmark/benchmark-chart.png)
These basic benchmarks ran 10,000 jobs through each library, at varying levels of concurrency, with io.js 2.2.1 and Redis 3.0.2 running directly on a 13" MBPr. The numbers shown are averages of 3 runs; the raw data collected and code used are available in the benchmark folder.
For a quick idea of space efficiency, the following table contains Redis memory usage as reported by [INFO](http://redis.io/commands/INFO) after doing a [FLUSHALL](http://redis.io/commands/FLUSHALL), restarting Redis, and running a single basic 10k job benchmark:
| Library | Memory After | Memory Peak | Memory After Δ | Memory Peak Δ |
| --------- | ------------- | ----------- | -------------- | ------------- |
| Bee-Queue | 2.65MB | 2.73MB | 1.67MB | 1.75MB |
| Bull | 3.93MB | 5.09MB | 2.95MB | 4.11MB |
| Kue | 7.53MB | 7.86MB | 6.55MB | 6.88MB |
The Δ columns factor out the ~986KB of memory usage reported by Redis on a fresh startup.
# Overview
## Creating Queues
[Queue](#queue) objects are the starting point to everything this library does. To make one, we just need to give it a name, typically indicating the sort of job it will process:
```javascript
var Queue = require('bee-queue');
var queue = new Queue('test');
var addQueue = new Queue('addition');
```
Queues are very lightweight - the only significant overhead is connecting to Redis - so if you need to handle different types of jobs, just instantiate a queue for each:
```javascript
var subQueue = new Queue('subtraction', {
redis: {
host: 'somewhereElse'
},
isWorker: false
});
```
Here, we pass a `settings` object to specify an alternate Redis host and to indicate that this queue will only add jobs (not process them). See [Queue Settings](#settings) for more options.
queue.on('ready', function () {
queue.process(function (job, done) {
console.log('processing job ' + job.jobId);
console.log('the sum is: ' + (job.data.x + job.data.y));
done();
## Creating Jobs
Jobs are created using `Queue.createJob(data)`, which returns a [Job](#job) object storing arbitrary `data`.
Jobs have a chaining API with commands `.retries(n)` and `.timeout(ms)` for setting options, and `.save([cb])` to save the job into Redis and enqueue it for processing:
```javascript
var job = addQueue.createJob({x: 2, y: 3});
job.timeout(3000).retries(2).save(function (err, job) {
// job enqueued, job.id populated
});
```
Jobs can later be retrieved from Redis using [Queue.getJob](#queuegetjobjobid-cberr-job), but most use cases won't need this, and can instead use [Job and Queue Events](#job-and-queue-events).
## Processing Jobs
To start processing jobs, call `Queue.process` and provide a handler function:
```javascript
addQueue.process(function (job, done) {
console.log('Processing job ' + job.id);
return done(null, job.data.x + job.data.y);
});
```
The handler function is given the job it needs to process, including `job.data` from when the job was created. It should then pass results to the `done` callback. For more on handlers, see [Queue.process](queueprocessconcurrency-handlerjob-done).
`.process` can only be called once per Queue instance, but we can process on as many instances as we like, spanning multiple processes or servers, as long as they all connect to the same Redis instance. From this, we can easily make a worker pool of machines who all run the same code and spend their lives processing our jobs, no matter where those jobs are created.
`.process` can also take a concurrency parameter. If your jobs spend most of their time just waiting on external resources, you might want each processor instance to handle 10 at a time:
```javascript
var baseUrl = 'http://www.google.com/search?q=';
subQueue.process(10, function (job, done) {
http.get(baseUrl + job.data.x + '-' + job.data.y, function (res) {
// parse the difference out of the response...
return done(null, difference);
});
});
```
var reportEnqueued = function (err, job) {
console.log('enqueued job ' + job.jobId);
};
## Progress Reporting
Handlers can send progress reports, which will be received as events on the original job instance:
```javascript
var job = addQueue.createJob({x: 2, y: 3}).save();
job.on('progress', function (progress) {
console.log('Job ' + job.id + ' reported progress: ' + progress + '%');
});
queue.add({x: 1, y: 1}, reportEnqueued);
queue.add({x: 1, y: 2}, reportEnqueued);
setTimeout(queue.add.bind(queue, {x: 1, y: 3}, reportEnqueued), 500);
addQueue.process(function (job, done) {
// do some work
job.reportProgress(30);
// do more work
job.reportProgress(80);
// do the rest
done();
});
```
Just like `.process`, these `progress` events work across multiple processes or servers; the job instance will receive the progress event no matter where processing happens.
Bee Queue: a simple, fast, robust job/task queue, backed by Redis.
## Job and Queue Events
- Simple: ~400 LOC, and the only dependency is [node-redis](https://github.com/mranney/node_redis).
- Fast: uses Lua scripting and pipelining whenever possible; numbers, benchmarks, etc to come.
- Robust: well-tested, designed to withstand failures and avoid race conditions.
There are three classes of events emitted by Bee-Queue objects: [Queue Local events](#queue-local-events), [Queue PubSub events](#queue-pubsub-events), and [Job events](#job-events). The linked API Reference sections provide a more complete overview of each.
Currently a bit raw, but 1.0.0 (and thorough docs/explanations/benchmarks/comparisons) should come soon.
Progress reporting, demonstrated above, happens via Job events. Jobs also emit `succeeded` events, which we've seen in the [opening example](#top), and `failed` and `retrying` events.
Heavily inspired by [Bull](https://github.com/OptimalBits/bull), which was an invaluable reference during development.
Queue PubSub events correspond directly to Job events: `job succeeded`, `job retrying`, `job failed`, and `job progress`. These events fire from all queue instances and for all jobs on the queue.
Why Bees? Bee Queue is like a bee because it:
- carries pollen (messages) between flowers (servers)
- is small and simple
- is fast (bees can fly 20mph!)
- doesn't sting you as much as wasps do
Queue local events include `ready` and `error` on all queue instances, and `succeeded`, `retrying`, and `failed` on worker queues corresponding to the PubSub events being sent out.
# Installation
Note that Job events become unreliable across process restarts, since the queue's reference to the associated job object will be lost. Queue PubSub events are thus potentially more reliable, but Job events are more convenient in places like HTTP requests where a process restart loses state anyway.
## Stalling Jobs
Bee-Queue attempts to provide ["at least once delivery"](http://www.cloudcomputingpatterns.org/At-least-once_Delivery), in the sense that any job enqueued should be processed at least once, even if a worker crashes, gets disconnected, or otherwise fails to confirm completion of the job.
To make this happen, workers periodically phone home to Redis about each job they're working on, just to say "I'm still working on this and I haven't stalled, so you don't need to retry it." The [`checkStalledJobs`](#queue-prototype-checkStalledJobs-cb) method finds any active jobs whose workers have gone silent (not phoned home for at least [`stallInterval`](#settings) ms), assumes they have stalled, and re-enqueues them.
# API Reference
## Queue
### Settings
The default Queue settings are:
```javascript
var queue = Queue('test', {
prefix: 'bq',
stallInterval: 5000,
redis: {
host: '127.0.0.1',
port: 6379,
db: 0,
options: {}
},
getEvents: true,
isWorker: true,
sendEvents: true,
removeOnSuccess: false,
catchExceptions: false
});
```
npm install bee-queue
The `settings` fields are:
- `prefix`: string, default `bq`. Useful if the `bq:` namespace is, for whatever reason, unavailable on your redis database.
- `stallInterval`: number, ms; the length of the window in which workers must report that they aren't stalling. Higher values will reduce Redis/network overhead, but if a worker stalls, it will take longer before its stalled job(s) will be retried.
- `redis`: object, specifies how to connect to Redis.
- `host`: string, Redis host.
- `port`: number, Redis port.
- `socket`: string, Redis socket to be used instead of a host and port.
- `db`: number, Redis [DB index](http://redis.io/commands/SELECT).
- `options`: options object, passed to [node_redis](https://github.com/mranney/node_redis#rediscreateclient).
- `isWorker`: boolean, default true. Disable if this queue will not process jobs.
- `getEvents`: boolean, default true. Disable if this queue does not need to receive job events.
- `sendEvents`: boolean, default true. Disable if this worker does not need to send job events back to other queues.
- `removeOnSuccess`: boolean, default false. Enable to keep memory usage down by automatically removing jobs from Redis when they succeed.
- `catchExceptions`: boolean, default false. Only enable if you want exceptions thrown by the [handler](#queueprocessconcurrency-handlerjob-done) to be caught by Bee-Queue and interpreted as job failures. Communicating failures via `done(err)` is preferred.
### Properties
- `name`: string, the name passed to the constructor.
- `keyPrefix`: string, the prefix used for all Redis keys associated with this queue.
- `paused`: boolean, whether the queue instance is paused. Only true if the queue is in the process of closing.
- `settings`: object; the settings determined between those passed and the defaults
### Queue Local Events
#### ready
```javascript
queue.on('ready', function () {
console.log('queue now ready to start doing things');
});
```
The queue has connected to Redis and ensured that the [Lua scripts are cached](http://redis.io/commands/script-load). You can often get away without checking for this event, but it's a good idea to wait for it in case the Redis host didn't have the scripts cached beforehand; if you try to enqueue jobs when the scripts are not yet cached, you may run into a Redis error.
# Methods
#### error
```javascript
queue.on('error', function (err) {
console.log('A queue error happened: ' + err.message);
});
```
Queue(name[, settings])
Queue.add(data[, cb(err, job)])
Queue.process([maxRunning,] handler(job, done(err)))
Any Redis errors are re-emitted from the Queue.
#### succeeded
```javascript
queue.on('succeeded', function (job, result) {
console.log('Job ' + job.id + ' succeeded with result: ' + result);
});
```
Concurrency is an integer denoting how many jobs can be run at once.
This queue has successfully processed `job`. If `result` is defined, the handler called `done(null, result)`.
#### retrying
```javascript
queue.on('retrying', function (job, err) {
console.log('Job ' + job.id + ' failed with error ' + err.message ' but is being retried!');
});
```
This queue has processed `job`, but it reported a failure and has been re-enqueued for another attempt. `job.options.retries` has been decremented accordingly.
The constructor's settings argument is an object whcih can take the following fields:
- `host`: redis host
- `port`: redis port
- `socket`: provide a socket path instead of a host and port
- `db`: redis DB index
- `options`: options object for [node-redis](https://github.com/mranney/node_redis#rediscreateclient)
- `lockTimeout`: ms, default 5000. The experation time of a processor's lock on a job; higher values will reduce the amount of relocking, but if a processor gets stuck, it will take longer before its stalled job gets retried.
- `globalKeyPrefix`: string, default 'bq'. Configurable just in case the `bq:` namespace is, for whatever reason, unavailable on your redis database.
- `catchExceptions`: boolean, default false. Whether to catch exceptions thrown by the handler given to `Queue.process`; only set to true if you must rely on throwing exceptions and having them be caught. Otherwise, communicate errors via `done(err)`.
#### failed
```javascript
queue.on('failed', function (job, err) {
console.log('Job ' + job.id + ' failed with error ' + err.message);
});
```
This queue has processed `job`, but its handler reported a failure with `done(err)`.
The process function's `maxRunning` parameter sets the maximum number of simultaneously active jobs, defaulting to 1.
### Queue PubSub Events
These events are all reported by some worker queue (with `sendEvents` enabled) and sent as Redis Pub/Sub messages back to any queues listening for them (with `getEvents` enabled). This means that listening for these events is effectively a monitor for all activity by all workers on the queue.
The handler's `done` callback should only be called once. The handler function should never throw an exception, unless `catchExceptions` has been enabled.
If the `jobId` of an event is for a job that was created by that queue instance, a corresponding [job event](#job-events) will be emitted from that job object.
It's analogous to [kue's processing concurrency](https://github.com/LearnBoost/kue#processing-concurrency). However, `bee-queue` will use only two Redis connections, while kue (and the equivalent in bull, using `maxRunning` instances of the same queue) will use `2 * maxRunning` connections.
Note that Queue PubSub events pass the `jobId`, but do not have a reference to the job object, since that job might have originally been created by some other queue in some other process. [Job events](#job-events) are emitted only in the process that created the job, and are emitted from the job object itself.
#### job succeeded
```javascript
queue.on('job succeeded', function (jobId, result) {
console.log('Job ' + job.id + ' succeeded with result: ' + result);
});
```
Some worker has successfully processed job `jobId`. If `result` is defined, the handler called `done(null, result)`.
#### job retrying
```javascript
queue.on('job retrying', function (jobId, err) {
console.log('Job ' + jobId + ' failed with error ' + err.message ' but is being retried!');
});
```
Some worker has processed job `jobId`, but it reported a failure and has been re-enqueued for another attempt.
#### job failed
```javascript
queue.on('job failed', function (jobId, err) {
console.log('Job ' + jobId + ' failed with error ' + err.message);
});
```
Some worker has processed `job`, but its handler reported a failure with `done(err)`.
#### job progress
```javascript
queue.on('job progress', function (jobId, progress) {
console.log('Job ' + jobId + ' reported progress: ' + progress + '%');
});
```
Some worker is processing job `jobId`, and it sent a [progress report](#jobreportprogressn) of `progress` percent.
### Methods
#### Queue(name, [settings])
Used to instantiate a new queue; opens connections to Redis.
#### Queue.prototype.createJob(data)
```javascript
var job = queue.createJob({...});
```
Returns a new [Job object](#job) with the associated [user data](#job).
#### Queue.prototype.getJob(jobId, cb)
```javascript
queue.getJob(3, function (err, job) {
console.log('Job 3 has status ' + job.status);
});
```
Looks up a job by its `jobId`. The returned job will emit events if `getEvents` is true.
Be careful with this method; most potential uses would be better served by job events on already-existing job instances. Using this method indiscriminately can lead to increasing memory usage, as each queue maintains a table of all associated jobs in order to dispatch events.
#### Queue.prototype.process([concurrency], handler(job, done))
Begins processing jobs with the provided handler function.
This function should only be called once, and should never be called on a queue where `isWorker` is false.
The optional `concurrency` parameter sets the maximum number of simultaneously active jobs for this processor. It defaults to 1.
The handler function should:
- Call `done` exactly once
- Use `done(err)` to indicate job failure
- Use `done()` or `done(null, result)` to indicate job success
- `result` must be JSON-serializable (for `JSON.stringify`)
- Never throw an exception, unless `catchExceptions` has been enabled.
- Never ever [block](http://www.slideshare.net/async_io/practical-use-of-mongodb-for-nodejs/47) [the](http://blog.mixu.net/2011/02/01/understanding-the-node-js-event-loop/) [event](http://strongloop.com/strongblog/node-js-performance-event-loop-monitoring/) [loop](http://zef.me/blog/4561/node-js-and-the-case-of-the-blocked-event-loop) (for very long). If you do, the stall detection might think the job stalled, when it was really just blocking the event loop.
#### Queue.prototype.checkStalledJobs([interval], [cb])
Checks for jobs that appear to be stalling and thus need to be retried, then re-enqueues them.
```javascript
queue.checkStalledJobs(5000, function (err) {
console.log('Checked stalled jobs'); // prints every 5000 ms
});
```
What happens after the check is determined by the parameters provided:
- `cb` only: `cb` is called
- `interval` only: a timeout is set to call the method again in `interval` ms
- `cb` and `interval`: a timeout is set, then `cb` is called
Bee-Queue automatically calls this method once when a worker begins processing, so it will check once if a worker process restarts. You should also make your own call with an interval parameter to make the check happen repeatedly over time; see [Under the hood](#under-the-hood) for an explanation why.
The maximum delay from when a job stalls until it will be retried is roughly `stallInterval` + `interval`, so to minimize that delay without calling `checkStalledJobs` unnecessarily often, set `interval` to be the same or a bit shorter than `stallInterval`. A good system-wide average frequency for the check is every 0.5-10 seconds, depending on how time-sensitive your jobs are in case of failure.
#### Queue.prototype.close([cb])
Closes the queue's connections to Redis.
## Job
### Properties
- `id`: number, Job ID unique to each job. Not populated until `.save` calls back.
- `data`: object; user data associated with the job. It should:
- Be JSON-serializable (for `JSON.stringify`)
- Never be used to pass large pieces of data (100kB+)
- Ideally be as small as possible (1kB or less)
- `options`: object used by Bee-Queue to store timeout, retries, etc.
- Do not modify directly; use job methods instead.
- `queue`: the Queue responsible for this instance of the job. This is either:
- the queue that called `createJob` to make the job
- the queue that called `process` to process it
- `progress`: number; progress between 0 and 100, as reported by `reportProgress`.
### Job Events
These are all Pub/Sub events like [Queue PubSub events](#pubsub-events) and are disabled when `getEvents` is false.
#### succeeded
```javascript
var job = queue.createJob({...}).save();
job.on('succeeded', function (result) {
console.log('Job ' + job.id + ' succeeded with result: ' + result);
});
```
The job has succeeded. If `result` is defined, the handler called `done(null, result)`.
#### retrying
```javascript
job.on('retrying', function (err) {
console.log('Job ' + job.id + ' failed with error ' + err.message ' but is being retried!');
});
```
The job has failed, but it is being automatically re-enqueued for another attempt. `job.options.retries` has been decremented accordingly.
#### failed
```javascript
job.on('failed', function (err) {
console.log('Job ' + job.id + ' failed with error ' + err.message);
});
```
The job has failed, and is not being retried.
#### progress
```javascript
job.on('progress', function (progress) {
console.log('Job ' + job.id + ' reported progress: ' + progress + '%');
});
```
The job has sent a [progress report](#jobreportprogressn) of `progress` percent.
### Methods
#### Job.prototype.retries(n)
```javascript
var job = queue.createJob({...}).retries(3).save();
```
Sets how many times the job should be automatically retried in case of failure.
Stored in `job.options.retries` and decremented each time the job is retried.
Defaults to 0.
#### Job.prototype.timeout(ms)
```javascript
var job = queue.createJob({...}).timeout(10000).save();
```
Sets a job runtime timeout; if the job's handler function takes longer than the timeout to call `done`, the worker assumes the job has failed and reports it as such.
Defaults to no timeout.
#### Job.prototype.save([cb])
```javascript
var job = queue.createJob({...}).save(function (err, job) {
console.log('Saved job ' + job.id);
});
```
Saves a job, queueing it up for processing. After the callback fires, `job.id` will be populated.
#### Job.prototype.reportProgress(n)
```javascript
queue.process(function (job, done) {
...
job.reportProgress(10);
...
job.reportProgress(50);
...
});
```
Reports job progress when called within a handler function. Causes a `progress` event to be emitted.
### Defaults
All methods with an optional callback field will use the following default:
```javascript
var defaultCb = function (err) {
if (err) throw err;
};
```
Defaults for Queue `settings` live in `lib/defaults.js`. Changing that file will change Bee-Queue's default behavior.
# Under the hood
Each Queue uses the following Redis keys:
- `bq:name:id`: Integer, incremented to determine the next Job ID.
- `bq:name:jobs`: Hash from Job ID to a JSON string of its data and options.
- `bq:name:waiting`: List of IDs of jobs waiting to be processed.
- `bq:name:active`: List of IDs jobs currently being processed.
- `bq:name:succeeded`: Set of IDs of jobs which succeeded.
- `bq:name:failed`: Set of IDs of jobs which failed.
- `bq:name:stalling`: Set of IDs of jobs which haven't 'checked in' during this interval.
- `bq:name:events`: Pub/Sub channel for workers to send out job results.
Bee-Queue is non-polling, so idle workers are listening to receive jobs as soon as they're enqueued to Redis. This is powered by [brpoplpush](http://redis.io/commands/BRPOPLPUSH), which is used to move jobs from the waiting list to the active list. Bee-Queue generally follows the "Reliable Queue" pattern described on the [rpoplpush page](http://redis.io/commands/rpoplpush).
The `isWorker` [setting](#settings) creates an extra Redis connection dedicated to `brpoplpush`, while `getEvents` creates one dedicated to receiving Pub/Sub events. As such, these settings should be disabled if you don't need them; in most cases, only one of them needs to be enabled.
The stalling set is a snapshot of the active list from the beginning of the latest stall interval. During each stalling interval, workers remove their job IDs from the stalling set, so at the end of an interval, any jobs whose IDs are left in the stalling set have missed their window (stalled) and need to be rerun. When `checkStalledJobs` runs, it re-enqueues any jobs left in the stalling set (to the waiting list), then takes a snapshot of the active list and stores it in the stalling set.
Bee-Queue requires the user to start the repeated checks on their own because if we did it automatically, every queue instance in the system would be doing the check. Checking from all instances is less efficient and provides weaker guarantees than just checking from one or two. For example, a `checkStalledJobs` interval of 5000ms running on 10 processes would average one check every 500ms, but would only guarantee a check every 5000ms. Two instances checking every 1000ms would also average one check every 500ms, but would be more well-distributed across time and would guarantee a check every 1000ms. Though the check is not expensive, and it doesn't hurt to do it extremely often, avoiding needless inefficiency is a main point of this library, so we leave it to the user to control exactly which processes are doing the check and how often.
# Contributing
Pull requests are welcome; just make sure `grunt test` passes.
Pull requests are welcome; just make sure `grunt test` passes. For significant changes, open an issue for discussion first.
Some significant non-features include:
- Job scheduling: Kue and Bull do this.
- Worker tracking: Kue does this.
- All-workers pause-resume: Bull does this.
- Web interface: Kue has a nice one built in, and someone made [one for Bull](https://github.com/ShaneK/Matador).
- Job priority: multiple queues get the job done in simple cases, but Kue has first-class support. Bull provides a wrapper around multiple queues.
Some of these could be worthwhile additions; please comment if you're interested in using or helping implement them!
You'll need a local redis server to run the tests. Note that running them will delete any keys that start with `bq:test:`.

@@ -84,1 +516,3 @@

[travis-url]: https://travis-ci.org/LewisJEllis/bee-queue
[coveralls-image]: https://coveralls.io/repos/LewisJEllis/bee-queue/badge.svg?branch=master
[coveralls-url]: https://coveralls.io/r/LewisJEllis/bee-queue?branch=master

@@ -11,4 +11,4 @@ /*eslint-disable no-shadow, handle-callback-err */

var queue = new Queue('test');
var data;
var job;
var data = {foo: 'bar'};
var options = {test: 1};

@@ -25,40 +25,25 @@ var clearKeys = function (done) {

var makeJob = function (cb) {
var job = queue.createJob(data);
job.options = options;
job.save(cb);
};
before(clearKeys);
after(clearKeys);
afterEach(clearKeys);
beforeEach(function (done) {
data = {foo: 'bar'};
return queue.add(data, function (err, newJob) {
job = newJob;
done();
});
});
describe('Constructor', function () {
it('creates a job', function () {
assert.ok(job, 'fails to return a job');
assert.property(job, 'jobId', 'job has no jobId');
assert.property(job, 'data', 'job has no data');
});
it('saves the job in redis', function (done) {
Job.fromId(queue, job.jobId, function (err, storedJob) {
assert.ok(storedJob, 'fails to return a job');
assert.property(storedJob, 'jobId', 'stored job has no jobId');
assert.property(storedJob, 'data', 'stored job has no data');
assert.deepEqual(storedJob.data, data, 'stored job data is wrong');
done();
makeJob(function (err, job) {
assert.isNull(err);
assert.ok(job, 'fails to return a job');
assert.property(job, 'id', 'job has no id');
assert.property(job, 'data', 'job has no data');
});
});
});
describe('remove', function () {
it('removes the job from redis', function (done) {
job.remove(function (err) {
it('creates a job without data', function () {
queue.createJob().save(function (err, job) {
assert.isNull(err);
queue.client.get(queue.toKey(job.jobId), function (err, results) {
assert.isNull(err);
assert.isNull(results);
done();
});
assert.deepEqual(job.data, {});
});

@@ -68,37 +53,36 @@ });

describe('moveToSet', function () {
var markJobTest = function (status) {
return function (done) {
job.isInSet(status, function (err, isMember) {
assert.isNull(err);
assert.isFalse(isMember);
job.moveToSet(status, function (err) {
assert.isNull(err);
job.isInSet(status, function (err, isMember) {
assert.isNull(err);
assert.isTrue(isMember);
done();
});
});
});
};
};
describe('Chaining', function () {
it('sets retries', function () {
var job = queue.createJob({foo: 'bar'}).retries(2);
assert.strictEqual(job.options.retries, 2);
});
it('marks the job as succeeded', markJobTest('succeeded'));
it('marks the job as failed', markJobTest('failed'));
});
it('rejects invalid retries count', function () {
try {
queue.createJob({foo: 'bar'}).retries(-1);
} catch (err) {
assert.strictEqual(err.message, 'Retries cannot be negative');
}
});
describe('Locking', function () {
it('acquires a lock', function (done) {
return job.acquireLock(function (err, acquired) {
assert.isTrue(acquired);
done();
});
it('sets timeout', function () {
var job = queue.createJob({foo: 'bar'}).timeout(5000);
assert.strictEqual(job.options.timeout, 5000);
});
it('cannot acquire existing lock', function (done) {
return job.acquireLock(function (err, acquired) {
assert.isTrue(acquired);
job.acquireLock(function (err, acquired) {
assert.isFalse(acquired);
it('rejects invalid timeout', function () {
try {
queue.createJob({foo: 'bar'}).timeout(-1);
} catch (err) {
assert.strictEqual(err.message, 'Timeout cannot be negative');
}
});
it('saves the job in redis', function (done) {
makeJob(function (err, job) {
Job.fromId(queue, job.id, function (err, storedJob) {
assert.ok(storedJob);
assert.property(storedJob, 'id');
assert.deepEqual(storedJob.data, data);
assert.deepEqual(storedJob.options, options);
done();

@@ -108,8 +92,9 @@ });

});
});
it('can renew a previously taken lock', function (done) {
return job.acquireLock(function (err, acquired) {
assert.isTrue(acquired);
job.renewLock(function (err, renewed) {
assert.isTrue(renewed);
describe('Progress', function () {
it('rejects out-of-bounds progress', function (done) {
makeJob(function (err, job) {
job.reportProgress(101, function (err) {
assert.strictEqual(err.message, 'Progress must be between 0 and 100');
done();

@@ -119,16 +104,14 @@ });

});
});
it('can renew a lock without acquiring first', function (done) {
return job.renewLock(function (err, renewed) {
assert.isTrue(renewed);
done();
});
});
it('can release a lock', function (done) {
return job.acquireLock(function (err, acquired) {
assert.isTrue(acquired);
job.releaseLock(function (err, released) {
assert.isTrue(released);
done();
describe('Remove', function () {
it('removes the job from redis', function (done) {
makeJob(function (err, job) {
job.remove(function (err) {
assert.isNull(err);
queue.client.hget(queue.toKey('jobs'), job.id, function (err, results) {
assert.isNull(err);
assert.isNull(results);
done();
});
});

@@ -138,10 +121,12 @@ });

it('cannot release a lock with a different token', function (done) {
return job.acquireLock(function (err, acquired) {
assert.isTrue(acquired);
job.queue.token = 'something else';
job.releaseLock(function (err, released) {
assert.isFalse(released);
done();
});
it('should work without a callback', function (done) {
makeJob(function (err, job) {
job.remove();
setTimeout(function () {
queue.client.hget(queue.toKey('jobs'), job.id, function (err, results) {
assert.isNull(err);
assert.isNull(results);
done();
});
}, 20);
});

@@ -148,0 +133,0 @@ });

@@ -35,16 +35,71 @@ /*eslint-disable no-shadow, handle-callback-err */

describe('.close', function () {
it('should call end on the clients', function (done) {
describe('Connection', function () {
describe('Close', function () {
it('should call end on the clients', function (done) {
queue = Queue('test');
var clientSpy = sinon.spy(queue.client, 'end');
var bclientSpy = sinon.spy(queue.bclient, 'end');
var eclientSpy = sinon.spy(queue.eclient, 'end');
queue.on('ready', function () {
queue.close(function (err) {
assert.isNull(err);
assert.isTrue(clientSpy.calledOnce);
assert.isTrue(bclientSpy.calledOnce);
assert.isTrue(eclientSpy.calledOnce);
queue = undefined;
done();
});
});
});
it('should work without a callback', function (done) {
queue = Queue('test');
queue.on('ready', function () {
queue.close();
setTimeout(function () {
assert.isFalse(queue.client.connected);
queue = undefined;
done();
}, 20);
});
});
});
it('should recover from a connection loss', function (done) {
queue = Queue('test');
var clientSpy = sinon.spy(queue.client, 'end');
var bclientSpy = sinon.spy(queue.bclient, 'end');
queue.on('error', function () {
// Prevent errors from bubbling up into exceptions
});
queue.close(function (err) {
assert.isNull(err);
assert.isTrue(clientSpy.calledOnce);
assert.isTrue(bclientSpy.calledOnce);
queue = undefined;
queue.process(function (job, jobDone) {
assert.strictEqual(job.data.foo, 'bar');
jobDone();
done();
});
queue.bclient.stream.end();
queue.bclient.emit('error', new Error('ECONNRESET'));
queue.createJob({foo: 'bar'}).save();
});
it('should reconnect when the blocking client triggers an "end" event', function (done) {
queue = Queue('test');
var jobSpy = sinon.spy(queue, 'getNextJob');
queue.process(function (job, jobDone) {
// First getNextJob fails on the disconnect, second should succeed
assert.strictEqual(jobSpy.callCount, 2);
jobDone();
done();
});
// Not called at all yet because queue.process uses setImmediate
assert.strictEqual(jobSpy.callCount, 0);
queue.createJob({foo: 'bar'}).save();
queue.bclient.emit('end');
});
});

@@ -68,4 +123,6 @@

queue = Queue('test', {
host: 'localhost',
db: 1
redis: {
host: 'localhost',
db: 1
}
});

@@ -76,4 +133,2 @@

assert.strictEqual(queue.bclient.connectionOption.host, 'localhost');
assert.strictEqual(queue.client.connectionOption.port, 6379);
assert.strictEqual(queue.bclient.connectionOption.port, 6379);
assert.strictEqual(queue.client.selected_db, 1);

@@ -84,374 +139,977 @@ assert.strictEqual(queue.bclient.selected_db, 1);

});
it('creates a queue with isWorker false', function (done) {
queue = Queue('test', {
isWorker: false
});
queue.once('ready', function () {
assert.strictEqual(queue.client.connectionOption.host, '127.0.0.1');
assert.isUndefined(queue.bclient);
done();
});
});
});
it('should recover from a connection loss', function (done) {
it('adds a job with correct prefix', function (done) {
queue = Queue('test');
queue.on('error', function () {
// Prevent errors from bubbling up into exceptions
queue.createJob({foo: 'bar'}).save(function (err, job) {
assert.isNull(err);
assert.ok(job.id);
queue.client.hget('bq:test:jobs', job.id, function (getErr, jobData) {
assert.isNull(getErr);
assert.strictEqual(jobData, job.toData());
done();
});
});
});
queue.process(function (job, jobDone) {
assert.strictEqual(job.data.foo, 'bar');
jobDone();
done();
describe('Health Check', function () {
it('reports a waiting job', function (done) {
queue = Queue('test');
var job = queue.createJob({foo: 'bar'});
job.save(function (err, job) {
assert.isNull(err);
assert.ok(job.id);
queue.checkHealth(function (healthErr, counts) {
assert.isNull(healthErr);
assert.strictEqual(counts.waiting, 1);
done();
});
});
});
queue.bclient.stream.end();
queue.bclient.emit('error', new Error('ECONNRESET'));
it('reports an active job', function (done) {
queue = Queue('test');
queue.add({'foo': 'bar'});
});
queue.process(function (job, jobDone) {
assert.strictEqual(job.data.foo, 'bar');
queue.checkHealth(function (healthErr, counts) {
assert.isNull(healthErr);
assert.strictEqual(counts.active, 1);
jobDone();
done();
});
});
var job = queue.createJob({foo: 'bar'});
job.save(function (err, job) {
assert.isNull(err);
assert.ok(job.id);
});
});
it('should reconnect when the blocking client triggers an "end" event', function (done) {
queue = Queue('test');
it('reports a succeeded job', function (done) {
queue = Queue('test');
var jobSpy = sinon.spy(queue, 'getNextJob');
queue.process(function (job, jobDone) {
// First getNextJob fails on the disconnect, second should succeed
assert.strictEqual(jobSpy.callCount, 2);
jobDone();
done();
queue.process(function (job, jobDone) {
assert.strictEqual(job.data.foo, 'bar');
jobDone();
});
var job = queue.createJob({foo: 'bar'});
job.save(function (err, job) {
assert.isNull(err);
assert.ok(job.id);
});
queue.on('succeeded', function (job) {
assert.ok(job);
queue.checkHealth(function (healthErr, counts) {
assert.isNull(healthErr);
assert.strictEqual(counts.succeeded, 1);
done();
});
});
});
// Not called at all yet because queue.process uses setImmediate
assert.strictEqual(jobSpy.callCount, 0);
it('reports a failed job', function (done) {
queue = Queue('test');
queue.add({'foo': 'bar'}, function () {});
queue.bclient.emit('end');
});
queue.process(function (job, jobDone) {
assert.strictEqual(job.data.foo, 'bar');
jobDone(Error('failed!'));
});
it('processes a job', function (done) {
queue = Queue('test');
var job = queue.createJob({foo: 'bar'});
job.save(function (err, job) {
assert.isNull(err);
assert.ok(job.id);
});
queue.process(function (job, jobDone) {
assert.strictEqual(job.data.foo, 'bar');
jobDone(null, 'baz');
queue.on('failed', function (job) {
assert.ok(job);
queue.checkHealth(function (healthErr, counts) {
assert.isNull(healthErr);
assert.strictEqual(counts.failed, 1);
done();
});
});
});
});
queue.add({foo: 'bar'}, function (err, job) {
assert.isNull(err);
assert.ok(job.jobId);
assert.strictEqual(job.data.foo, 'bar');
describe('getJob', function () {
it('gets an job created by the same queue instance', function (done) {
queue = Queue('test');
var createdJob = queue.createJob({foo: 'bar'});
createdJob.save(function (err, createdJob) {
assert.isNull(err);
assert.ok(createdJob.id);
queue.getJob(createdJob.id, function (getErr, job) {
assert.isNull(getErr);
assert.strictEqual(job.toData(), createdJob.toData());
done();
});
});
});
queue.on('succeeded', function (job, data) {
assert.ok(job);
assert.strictEqual(data, 'baz');
done();
it('gets a job created by another queue instance', function (done) {
queue = Queue('test');
var reader = Queue('test');
var job = queue.createJob({foo: 'bar'});
job.save(function (err, createdJob) {
assert.isNull(err);
assert.ok(createdJob.id);
reader.getJob(createdJob.id, function (getErr, job) {
assert.isNull(getErr);
assert.strictEqual(job.toData(), createdJob.toData());
done();
});
});
});
});
it('processes many jobs in a row with one processor', function (done) {
queue = Queue('test');
var counter = 0;
var numJobs = 20;
describe('Processing jobs', function () {
it('processes a job', function (done) {
queue = Queue('test');
queue.process(function (job, jobDone) {
assert.strictEqual(job.data.count, counter);
counter++;
jobDone();
if (counter === numJobs) {
queue.process(function (job, jobDone) {
assert.strictEqual(job.data.foo, 'bar');
jobDone(null, 'baz');
});
var job = queue.createJob({foo: 'bar'});
job.save(function (err, job) {
assert.isNull(err);
assert.ok(job.id);
assert.strictEqual(job.data.foo, 'bar');
});
queue.on('succeeded', function (job, data) {
assert.ok(job);
assert.strictEqual(data, 'baz');
job.isInSet('succeeded', function (err, isMember) {
assert.isTrue(isMember);
done();
});
});
});
it('processes a job with removeOnSuccess', function (done) {
queue = Queue('test', {
removeOnSuccess: true
});
queue.process(function (job, jobDone) {
assert.strictEqual(job.data.foo, 'bar');
jobDone(null);
});
queue.createJob({foo: 'bar'}).save(function (err, job) {
assert.isNull(err);
assert.ok(job.id);
assert.strictEqual(job.data.foo, 'bar');
});
queue.on('succeeded', function (job) {
queue.client.hget(queue.toKey('jobs'), job.id, function (err, jobData) {
assert.isNull(err);
assert.isNull(jobData);
done();
});
});
});
it('processes a job that fails', function (done) {
queue = Queue('test');
queue.process(function (job, jobDone) {
assert.strictEqual(job.data.foo, 'bar');
jobDone(Error('failed!'));
});
var job = queue.createJob({foo: 'bar'});
job.save(function (err, job) {
assert.isNull(err);
assert.ok(job.id);
assert.strictEqual(job.data.foo, 'bar');
});
queue.on('failed', function (job, err) {
assert.ok(job);
assert.strictEqual(job.data.foo, 'bar');
assert.strictEqual(err.message, 'failed!');
job.isInSet('failed', function (err, isMember) {
assert.isTrue(isMember);
done();
});
});
});
it('processes a job that throws an exception', function (done) {
queue = Queue('test', {
catchExceptions: true
});
queue.process(function (job) {
assert.strictEqual(job.data.foo, 'bar');
throw Error('exception!');
});
queue.createJob({foo: 'bar'}).save(function (err, job) {
assert.isNull(err);
assert.ok(job.id);
assert.strictEqual(job.data.foo, 'bar');
});
queue.on('failed', function (job, err) {
assert.ok(job);
assert.strictEqual(job.data.foo, 'bar');
assert.strictEqual(err.message, 'exception!');
done();
});
});
it('processes and retries a job that fails', function (done) {
queue = Queue('test');
var callCount = 0;
queue.process(function (job, jobDone) {
callCount++;
assert.strictEqual(job.data.foo, 'bar');
if (callCount > 1) {
return jobDone();
} else {
return jobDone(Error('failed!'));
}
});
queue.on('failed', function (job, err) {
assert.ok(job);
assert.strictEqual(job.data.foo, 'bar');
assert.strictEqual(err.message, 'failed!');
job.retry();
});
queue.on('succeeded', function () {
assert.strictEqual(callCount, 2);
done();
});
queue.createJob({foo: 'bar'}).save(function (err, job) {
assert.isNull(err);
assert.ok(job.id);
assert.strictEqual(job.data.foo, 'bar');
});
});
it('processes a job that times out', function (done) {
queue = Queue('test');
queue.process(function (job, jobDone) {
assert.strictEqual(job.data.foo, 'bar');
setTimeout(jobDone, 20);
});
queue.createJob({foo: 'bar'}).timeout(10).save(function (err, job) {
assert.isNull(err);
assert.ok(job.id);
assert.strictEqual(job.data.foo, 'bar');
assert.strictEqual(job.options.timeout, 10);
});
queue.on('failed', function (job, err) {
assert.ok(job);
assert.strictEqual(job.data.foo, 'bar');
assert.strictEqual(err.message, 'Job 1 timed out (10 ms)');
done();
});
});
it('processes a job that auto-retries', function (done) {
queue = Queue('test');
var failCount = 0;
var retries = 1;
var failMsg = 'failing to auto-retry...';
queue.process(function (job, jobDone) {
assert.strictEqual(job.data.foo, 'bar');
if (job.options.retries === 0) {
assert.strictEqual(failCount, retries);
jobDone();
done();
} else {
jobDone(Error(failMsg));
}
});
queue.createJob({foo: 'bar'}).retries(retries).save(function (err, job) {
assert.isNull(err);
assert.ok(job.id);
assert.strictEqual(job.data.foo, 'bar');
assert.strictEqual(job.options.retries, retries);
});
queue.on('failed', function (job, err) {
failCount += 1;
assert.ok(job);
assert.strictEqual(job.data.foo, 'bar');
assert.strictEqual(err.message, failMsg);
});
});
it('processes a job that times out and auto-retries', function (done) {
queue = Queue('test');
var failCount = 0;
var retries = 1;
queue.process(function (job, jobDone) {
assert.strictEqual(job.data.foo, 'bar');
if (job.options.retries === 0) {
assert.strictEqual(failCount, retries);
jobDone();
done();
} else {
setTimeout(jobDone, 20);
}
});
queue.createJob({foo: 'bar'}).timeout(10).retries(retries).save(function (err, job) {
assert.isNull(err);
assert.ok(job.id);
assert.strictEqual(job.data.foo, 'bar');
assert.strictEqual(job.options.retries, retries);
});
queue.on('failed', function (job) {
failCount += 1;
assert.ok(job);
assert.strictEqual(job.data.foo, 'bar');
});
});
it('Refuses to process when isWorker is false', function (done) {
queue = Queue('test', {
isWorker: false
});
try {
queue.process();
} catch (err) {
assert.strictEqual(err.message, 'Cannot call Queue.prototype.process on a non-worker');
done();
}
});
for (var i = 0; i < numJobs; i++) {
queue.add({count: i});
}
it('Refuses to be called twice', function (done) {
queue = Queue('test');
queue.process(function () {});
try {
queue.process();
} catch (err) {
assert.strictEqual(err.message, 'Cannot call Queue.prototype.process twice');
done();
}
});
});
it('processes many jobs with one concurrent processor', function (done) {
queue = Queue('test');
var counter = 0;
var concurrency = 5;
var numJobs = 20;
describe('Processing many jobs', function () {
it('processes many jobs in a row with one processor', function (done) {
queue = Queue('test');
var counter = 0;
var numJobs = 20;
queue.process(concurrency, function (job, jobDone) {
assert.isTrue(queue.running <= concurrency);
setTimeout(function () {
jobDone();
queue.process(function (job, jobDone) {
assert.strictEqual(job.data.count, counter);
counter++;
jobDone();
if (counter === numJobs) {
done();
}
}, 20);
});
for (var i = 0; i < numJobs; i++) {
queue.createJob({count: i}).save();
}
});
for (var i = 0; i < numJobs; i++) {
queue.add({count: i});
}
});
it('processes many jobs with one concurrent processor', function (done) {
queue = Queue('test');
var counter = 0;
var concurrency = 5;
var numJobs = 20;
it('processes many randomly delayed jobs with one concurrent processor', function (done) {
queue = Queue('test');
var counter = 0;
var concurrency = 5;
var numJobs = 20;
queue.process(concurrency, function (job, jobDone) {
assert.isTrue(queue.running <= concurrency);
setTimeout(function () {
jobDone();
assert.strictEqual(job.data.count, counter);
counter++;
if (counter === numJobs) {
done();
}
}, 10);
});
queue.process(concurrency, function (job, jobDone) {
assert.isTrue(queue.running <= concurrency);
setTimeout(function () {
for (var i = 0; i < numJobs; i++) {
queue.createJob({count: i}).save();
}
});
it('processes many randomly delayed jobs with one concurrent processor', function (done) {
queue = Queue('test');
var counter = 0;
var concurrency = 5;
var numJobs = 20;
queue.process(concurrency, function (job, jobDone) {
assert.isTrue(queue.running <= concurrency);
setTimeout(function () {
jobDone();
counter++;
if (counter === numJobs) {
done();
}
}, 10);
});
var addJob = function (i) {
queue.createJob({count: i}).save();
};
for (var i = 0; i < numJobs; i++) {
setTimeout(addJob.bind(null, i), Math.random() * 50);
}
});
it('processes many jobs with multiple processors', function (done) {
queue = Queue('test');
var processors = [
Queue('test'),
Queue('test'),
Queue('test')
];
var counter = 0;
var numJobs = 20;
var processed = [];
var handleJob = function (job, jobDone) {
counter++;
processed[job.data.count] = true;
jobDone();
counter++;
if (counter === numJobs) {
done();
for (var i = 0; i < numJobs; i++) {
assert.isTrue(processed[i]);
}
var reportClosed = barrier(3, done);
processors.forEach(function (queue) {
queue.close(reportClosed);
});
}
}, 20);
};
processors.forEach(function (queue) {
queue.process(handleJob);
});
for (var i = 0; i < numJobs; i++) {
queue.createJob({count: i}).save();
}
});
for (var i = 0; i < numJobs; i++) {
setTimeout(queue.add.bind(queue, {count: i}), Math.random() * 75);
}
});
it('processes many jobs with multiple processors', function (done) {
queue = Queue('test');
var processors = [
Queue('test'),
Queue('test'),
Queue('test')
];
var counter = 0;
var numJobs = 20;
describe('Resets', function () {
it('resets and processes stalled jobs when starting a queue', function (done) {
var deadQueue = Queue('test', {
stallInterval: 0
});
var handleJob = function (job, jobDone) {
assert.strictEqual(job.data.count, counter);
jobDone();
var processJobs = function () {
queue = Queue('test', {
stallInterval: 0
});
var reportDone = barrier(3, done);
queue.checkStalledJobs(function () {
queue.process(function (job, jobDone) {
jobDone();
reportDone();
});
});
};
counter++;
if (counter === numJobs) {
var reportClosed = barrier(3, done);
processors.forEach(function (queue) {
queue.close(reportClosed);
var processAndClose = function () {
deadQueue.process(function () {
deadQueue.close(processJobs);
});
}
};
};
processors.forEach(function (queue) {
queue.process(handleJob);
var reportAdded = barrier(3, processAndClose);
deadQueue.createJob({foo: 'bar1'}).save(reportAdded);
deadQueue.createJob({foo: 'bar2'}).save(reportAdded);
deadQueue.createJob({foo: 'bar3'}).save(reportAdded);
});
for (var i = 0; i < numJobs; i++) {
queue.add({count: i});
}
});
it('resets and processes jobs from multiple stalled queues', function (done) {
var processJobs = function () {
queue = Queue('test', {
stallInterval: 0
});
var reportDone = barrier(5, done);
queue.checkStalledJobs(function () {
queue.process(function (job, jobDone) {
assert.strictEqual(job.data.foo, 'bar');
jobDone();
reportDone();
});
});
};
it('processes a job that fails', function (done) {
queue = Queue('test');
var reportClosed = barrier(5, processJobs);
queue.process(function (job, jobDone) {
assert.strictEqual(job.data.foo, 'bar');
jobDone(Error('failed!'));
});
var createAndStall = function () {
var queue = Queue('test', {
stallInterval: 0
});
queue.createJob({foo: 'bar'}).save(function () {
queue.process(function () {
queue.close(reportClosed);
});
});
};
queue.add({foo: 'bar'}, function (err, job) {
assert.isNull(err);
assert.ok(job.jobId);
assert.strictEqual(job.data.foo, 'bar');
for (var i = 0; i < 5; i++) {
createAndStall();
}
});
queue.on('failed', function (job, err) {
assert.ok(job);
assert.strictEqual(job.data.foo, 'bar');
assert.strictEqual(err.message, 'failed!');
done();
it('resets and processes stalled jobs from concurrent processor', function (done) {
var deadQueue = Queue('test', {
stallInterval: 0
});
var counter = 0;
var concurrency = 5;
var numJobs = 10;
var processJobs = function () {
queue = Queue('test', {
stallInterval: 0
});
queue.checkStalledJobs(function () {
queue.process(function (job, jobDone) {
counter += 1;
jobDone();
if (counter === numJobs) {
done();
}
});
});
};
var processAndClose = function () {
deadQueue.process(concurrency, function () {
// wait for it to get all spooled up...
if (deadQueue.running === concurrency) {
deadQueue.close(processJobs);
}
});
};
var reportAdded = barrier(numJobs, processAndClose);
for (var i = 0; i < numJobs; i++) {
deadQueue.createJob({count: i}).save(reportAdded);
}
});
});
it('processes a job that throws an exception', function (done) {
queue = Queue('test', {
catchExceptions: true
it('should reset without a callback', function (done) {
var deadQueue = Queue('test', {
stallInterval: 0
});
var processJobs = function () {
queue = Queue('test', {
stallInterval: 0
});
var reportDone = barrier(3, done);
queue.checkStalledJobs();
setTimeout(function () {
queue.process(function (job, jobDone) {
reportDone();
jobDone();
});
}, 20);
};
var processAndClose = function () {
deadQueue.process(function () {
deadQueue.close(processJobs);
});
};
var reportAdded = barrier(3, processAndClose);
deadQueue.createJob({foo: 'bar1'}).save(reportAdded);
deadQueue.createJob({foo: 'bar2'}).save(reportAdded);
deadQueue.createJob({foo: 'bar3'}).save(reportAdded);
});
queue.process(function (job) {
assert.strictEqual(job.data.foo, 'bar');
throw Error('exception!');
it('should reset with an interval', function (done) {
var deadQueue = Queue('test', {
stallInterval: 0
});
var processJobs = function () {
queue = Queue('test', {
stallInterval: 0
});
var reportDone = barrier(6, done);
queue.checkStalledJobs(10, reportDone);
setTimeout(function () {
queue.process(function (job, jobDone) {
reportDone();
jobDone();
});
}, 20);
};
var processAndClose = function () {
deadQueue.process(function () {
deadQueue.close(processJobs);
});
};
var reportAdded = barrier(3, processAndClose);
deadQueue.createJob({foo: 'bar1'}).save(reportAdded);
deadQueue.createJob({foo: 'bar2'}).save(reportAdded);
deadQueue.createJob({foo: 'bar3'}).save(reportAdded);
});
});
queue.add({foo: 'bar'}, function (err, job) {
assert.isNull(err);
assert.ok(job.jobId);
assert.strictEqual(job.data.foo, 'bar');
describe('Startup', function () {
it('processes pre-existing jobs when starting a queue', function (done) {
var deadQueue = Queue('test');
var processJobs = function () {
queue = Queue('test');
var jobCount = 0;
queue.process(function (job, jobDone) {
assert.strictEqual(job.data.foo, 'bar' + (++jobCount));
jobDone();
if (jobCount === 3) {
done();
}
});
};
var reportAdded = barrier(3, deadQueue.close.bind(deadQueue, processJobs));
deadQueue.createJob({foo: 'bar1'}).save(reportAdded);
deadQueue.createJob({foo: 'bar2'}).save(reportAdded);
deadQueue.createJob({foo: 'bar3'}).save(reportAdded);
});
queue.on('failed', function (job, err) {
assert.ok(job);
assert.strictEqual(job.data.foo, 'bar');
assert.strictEqual(err.message, 'exception!');
done();
it('does not process an in-progress job when a new queue starts', function (done) {
queue = Queue('test');
queue.createJob({foo: 'bar'}).save(function () {
queue.process(function (job, jobDone) {
assert.strictEqual(job.data.foo, 'bar');
setTimeout(jobDone, 30);
});
var queue2 = Queue('test');
setTimeout(function () {
queue2.process(function () {
assert.fail('queue2 should not process a job');
});
queue.on('succeeded', queue2.close.bind(queue2, done));
}, 10);
});
});
});
it('resets and processes stalled jobs when starting a queue', function (done) {
var deadQueue = Queue('test', {
lockTimeout: 10
describe('Pubsub events', function () {
it('emits a job succeeded event', function (done) {
queue = Queue('test');
var worker = Queue('test');
var queueEvent = false;
var job = queue.createJob({foo: 'bar'});
job.once('succeeded', function (result) {
assert.isTrue(queueEvent);
assert.strictEqual(result, 'barbar');
worker.close(done);
});
queue.once('job succeeded', function (jobId, result) {
queueEvent = true;
assert.strictEqual(jobId, job.id);
assert.strictEqual(result, 'barbar');
});
job.save();
worker.process(function (job, jobDone) {
jobDone(null, job.data.foo + job.data.foo);
});
});
var processJobs = function () {
it('emits a job succeeded event with no result', function (done) {
queue = Queue('test');
var jobCount = 0;
queue.process(function (job, jobDone) {
assert.strictEqual(job.data.foo, 'bar' + (++jobCount));
jobDone();
if (jobCount === 3) {
done();
}
var worker = Queue('test');
var queueEvent = false;
var job = queue.createJob({foo: 'bar'});
job.on('succeeded', function (result) {
assert.isTrue(queueEvent);
assert.strictEqual(result, undefined);
worker.close(done);
});
};
queue.once('job succeeded', function (jobId, result) {
queueEvent = true;
assert.strictEqual(jobId, job.id);
assert.strictEqual(result, undefined);
});
job.save();
var processAndClose = function () {
deadQueue.process(function () {
deadQueue.close(function () {
setTimeout(processJobs, 15);
});
worker.process(function (job, jobDone) {
jobDone(null);
});
};
});
var reportAdded = barrier(3, processAndClose);
it('emits a job failed event', function (done) {
queue = Queue('test');
var worker = Queue('test');
var queueEvent = false;
deadQueue.add({foo: 'bar1'}, reportAdded);
deadQueue.add({foo: 'bar2'}, reportAdded);
deadQueue.add({foo: 'bar3'}, reportAdded);
});
var job = queue.createJob({foo: 'bar'});
job.on('failed', function (err) {
assert.isTrue(queueEvent);
assert.strictEqual(err.message, 'fail!');
worker.close(done);
});
queue.once('job failed', function (jobId, err) {
queueEvent = true;
assert.strictEqual(jobId, job.id);
assert.strictEqual(err.message, 'fail!');
});
job.save();
it('resets and processes jobs from multiple stalled queues', function (done) {
var processJobs = function () {
queue = Queue('test');
var reportDone = barrier(5, done);
queue.process(function (job, jobDone) {
assert.strictEqual(job.data.foo, 'bar');
jobDone();
reportDone();
worker.process(function (job, jobDone) {
jobDone(Error('fail!'));
});
};
});
var reportClosed = barrier(5, setTimeout.bind(null, processJobs, 15));
it('emits a job progress event', function (done) {
queue = Queue('test');
var worker = Queue('test');
var reportedProgress = false;
var queueEvent = false;
var createAndStall = function () {
var queue = Queue('test', {
lockTimeout: 10
var job = queue.createJob({foo: 'bar'});
job.on('progress', function (progress) {
assert.isTrue(queueEvent);
assert.strictEqual(progress, 20);
reportedProgress = true;
});
queue.add({foo: 'bar'}, function () {
queue.process(function () {
queue.close(reportClosed);
});
queue.once('job progress', function (jobId, progress) {
queueEvent = true;
assert.strictEqual(jobId, job.id);
assert.strictEqual(progress, 20);
});
};
for (var i = 0; i < 5; i++) {
createAndStall();
}
});
it('resets and processes stalled jobs from concurrent processor', function (done) {
var deadQueue = Queue('test', {
lockTimeout: 10
job.on('succeeded', function () {
assert.isTrue(reportedProgress);
assert.isTrue(queueEvent);
worker.close(done);
});
job.save();
worker.process(function (job, jobDone) {
job.reportProgress(20);
setTimeout(jobDone, 20);
});
});
var counter = 0;
var concurrency = 5;
var numJobs = 10;
var processJobs = function () {
it('emits a job retrying event', function (done) {
queue = Queue('test');
queue.process(function (job, jobDone) {
assert.strictEqual(job.data.count, counter++);
jobDone();
if (counter === numJobs) {
done();
}
var worker = Queue('test');
var retried = false;
var queueEvent = false;
var job = queue.createJob({foo: 'bar'}).retries(1);
job.on('retrying', function (err) {
assert.strictEqual(job.options.retries, 0);
assert.strictEqual(err.message, 'failing to retry');
});
};
queue.once('job retrying', function (jobId, err) {
queueEvent = true;
assert.strictEqual(jobId, job.id);
assert.strictEqual(err.message, 'failing to retry');
});
job.on('succeeded', function (result) {
assert.isTrue(retried);
assert.isTrue(queueEvent);
assert.strictEqual(result, 'retried');
worker.close(done);
});
job.save();
var processAndClose = function () {
deadQueue.process(concurrency, function () {
// wait for it to get all spooled up...
if (deadQueue.running === concurrency) {
deadQueue.close(function () {
setTimeout(processJobs, 15);
});
worker.process(function (job, jobDone) {
if (retried) {
jobDone(null, 'retried');
} else {
retried = true;
jobDone(Error('failing to retry'));
}
});
};
});
var reportAdded = barrier(numJobs, processAndClose);
it('are not received when getEvents is false', function (done) {
queue = Queue('test', {
getEvents: false
});
var worker = Queue('test');
for (var i = 0; i < numJobs; i++) {
deadQueue.add({count: i}, reportAdded);
}
});
assert.isUndefined(queue.eclient);
it('processes pre-existing jobs when starting a queue', function (done) {
var deadQueue = Queue('test', {
lockTimeout: 10
var job = queue.createJob({foo: 'bar'});
job.on('succeeded', function () {
assert.fail();
});
job.save();
worker.process(function (job, jobDone) {
jobDone(null, job.data.foo);
setTimeout(worker.close.bind(worker, done), 20);
});
});
var processJobs = function () {
it('are not sent when sendEvents is false', function (done) {
queue = Queue('test');
var jobCount = 0;
queue.process(function (job, jobDone) {
assert.strictEqual(job.data.foo, 'bar' + (++jobCount));
jobDone();
if (jobCount === 3) {
done();
}
var worker = Queue('test', {
sendEvents: false
});
};
var reportAdded = barrier(3, deadQueue.close.bind(deadQueue, processJobs));
var job = queue.createJob({foo: 'bar'});
job.on('succeeded', function () {
assert.fail();
});
job.save();
deadQueue.add({foo: 'bar1'}, reportAdded);
deadQueue.add({foo: 'bar2'}, reportAdded);
deadQueue.add({foo: 'bar3'}, reportAdded);
worker.process(function (job, jobDone) {
jobDone(null, job.data.foo);
setTimeout(worker.close.bind(worker, done), 20);
});
});
it('properly emits events with multiple jobs', function (done) {
queue = Queue('test');
var worker = Queue('test');
var reported = 0;
var jobIdSum = 0;
var job1 = queue.createJob({foo: 'bar'});
var job2 = queue.createJob({foo: 'baz'});
job1.on('succeeded', function (result) {
reported += 1;
assert.strictEqual(result, 'barbar');
});
job2.on('succeeded', function (result) {
reported += 1;
assert.strictEqual(result, 'bazbaz');
});
queue.on('job succeeded', function (id) {
jobIdSum += id;
});
job1.save();
job2.save();
worker.process(function (job, jobDone) {
jobDone(null, job.data.foo + job.data.foo);
setTimeout(function () {
assert.strictEqual(jobIdSum, 3);
assert.strictEqual(reported, 2);
worker.close(done);
}, 20);
});
});
});
it('does not process a locked job when a new queue starts', function (done) {
queue = Queue('test');
queue.add({foo: 'bar'}, function () {
describe('Destroy', function () {
it('should remove all associated redis keys', function (done) {
queue = Queue('test');
queue.process(function (job, jobDone) {
assert.strictEqual(job.data.foo, 'bar');
setTimeout(jobDone, 30);
jobDone();
});
var queue2 = Queue('test');
setTimeout(function () {
queue2.process(function () {
assert.fail('queue2 should not process a job');
queue.createJob({foo: 'bar'}).save(function (err, job) {
assert.isNull(err);
assert.ok(job.id);
assert.strictEqual(job.data.foo, 'bar');
});
var checkForKeys = function (err) {
assert.isNull(err);
queue.client.keys(queue.toKey('*'), function (keysErr, keys) {
assert.isNull(keysErr);
assert.deepEqual(keys, []);
done();
});
queue.on('succeeded', queue2.close.bind(queue2, done));
}, 10);
};
queue.on('succeeded', function (job) {
assert.ok(job);
queue.destroy(checkForKeys);
});
});
});
it('retries a job that fails', function (done) {
queue = Queue('test');
var callCount = 0;
it('should work without a callback', function (done) {
queue = Queue('test');
queue.process(function (job, jobDone) {
callCount++;
assert.strictEqual(job.data.foo, 'bar');
if (callCount > 1) {
return jobDone();
} else {
return jobDone(Error('failed!'));
}
});
queue.process(function (job, jobDone) {
assert.strictEqual(job.data.foo, 'bar');
jobDone();
});
queue.on('failed', function (job, err) {
assert.ok(job);
assert.strictEqual(job.data.foo, 'bar');
assert.strictEqual(err.message, 'failed!');
job.retry();
});
queue.createJob({foo: 'bar'}).save(function (err, job) {
assert.isNull(err);
assert.ok(job.id);
assert.strictEqual(job.data.foo, 'bar');
});
queue.on('succeeded', function () {
assert.strictEqual(callCount, 2);
done();
});
var checkForKeys = function () {
queue.client.keys(queue.toKey('*'), function (err, keys) {
assert.isNull(err);
assert.deepEqual(keys, []);
done();
});
};
queue.add({foo: 'bar'}, function (err, job) {
assert.isNull(err);
assert.ok(job.jobId);
assert.strictEqual(job.data.foo, 'bar');
queue.on('succeeded', function (job) {
assert.ok(job);
queue.destroy();
setTimeout(checkForKeys, 20);
});
});

@@ -458,0 +1116,0 @@ });

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc